You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/01/24 04:24:41 UTC

[GitHub] [dolphinscheduler] caishunfeng commented on a change in pull request #6718: [Feature][DataQuality] Add Data quality Module #4283

caishunfeng commented on a change in pull request #6718:
URL: https://github.com/apache/dolphinscheduler/pull/6718#discussion_r790401942



##########
File path: dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.xml
##########
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.DqExecuteResultMapper">
+    <select id="queryResultListPaging" resultType="org.apache.dolphinscheduler.dao.entity.DqExecuteResult">
+        SELECT a.id, a.process_definition_id, b.name as process_definition_name, b.code as process_definition_code, a.process_instance_id, e.name as process_instance_name,a.task_instance_id, c.name as task_name, a.rule_type, a.rule_name, a.statistics_value, a.comparison_value, a.check_type,
+        a.threshold , cp.type as comparison_type_name,
+        a.operator, a.failure_strategy, a.state, a.user_id, d.user_name, a.create_time, a.update_time
+        FROM t_ds_dq_execute_result a
+        left join (select id,name,code from t_ds_process_definition) b on a.process_definition_id = b.id

Review comment:
       why to use sub query like `(select id,name,code from t_ds_process_definition) b` but not left join `t_ds_process_definition` directly?

##########
File path: dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
##########
@@ -55,7 +57,8 @@
     SQOOP(12, "SQOOP"),
     SEATUNNEL(13, "SEATUNNEL"),
     SWITCH(14, "SWITCH"),
-    PIGEON(15, "PIGEON");
+    PIGEON(15, "PIGEON"),
+    DATA_QUALITY(16, "data_quality");

Review comment:
       ```suggestion
       DATA_QUALITY(16, "DATA_QUALITY");
   ```

##########
File path: dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.xml
##########
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.DqExecuteResultMapper">
+    <select id="queryResultListPaging" resultType="org.apache.dolphinscheduler.dao.entity.DqExecuteResult">
+        SELECT a.id, a.process_definition_id, b.name as process_definition_name, b.code as process_definition_code, a.process_instance_id, e.name as process_instance_name,a.task_instance_id, c.name as task_name, a.rule_type, a.rule_name, a.statistics_value, a.comparison_value, a.check_type,
+        a.threshold , cp.type as comparison_type_name,
+        a.operator, a.failure_strategy, a.state, a.user_id, d.user_name, a.create_time, a.update_time
+        FROM t_ds_dq_execute_result a
+        left join (select id,name,code from t_ds_process_definition) b on a.process_definition_id = b.id

Review comment:
       and I think maybe you can check the processDefinition early in code, which can reduce the sql join.

##########
File path: dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DqRuleInputEntryMapper.xml
##########
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.DqRuleInputEntryMapper">
+
+    <select id="getRuleInputEntryList" resultType="org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry">
+        SELECT * , b.values_map, b.index FROM t_ds_dq_rule_input_entry a join ( SELECT *

Review comment:
       avoid to use `*`

##########
File path: dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.xml
##########
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="org.apache.dolphinscheduler.dao.mapper.DqExecuteResultMapper">
+    <select id="queryResultListPaging" resultType="org.apache.dolphinscheduler.dao.entity.DqExecuteResult">
+        SELECT a.id, a.process_definition_id, b.name as process_definition_name, b.code as process_definition_code, a.process_instance_id, e.name as process_instance_name,a.task_instance_id, c.name as task_name, a.rule_type, a.rule_name, a.statistics_value, a.comparison_value, a.check_type,
+        a.threshold , cp.type as comparison_type_name,
+        a.operator, a.failure_strategy, a.state, a.user_id, d.user_name, a.create_time, a.update_time
+        FROM t_ds_dq_execute_result a
+        left join (select id,name,code from t_ds_process_definition) b on a.process_definition_id = b.id
+        left join (select id,name from t_ds_task_instance) c on a.task_instance_id = c.id
+        left join t_ds_process_instance e on a.process_instance_id = e.id
+        left join t_ds_user d on d.id = a.user_id
+        left join t_ds_dq_comparison_type cp on cp.id = a.comparison_type
+        <where>
+            <if test=" searchVal != null and searchVal != ''">
+                and c.name like concat('%', #{searchVal}, '%')
+            </if>
+            <if test="startTime != null ">
+                and a.update_time > #{startTime} and a.update_time <![CDATA[ <=]]> #{endTime}
+            </if>
+            <if test="states != null and states != ''">
+                and a.state in
+                <foreach collection="states" index="index" item="i" open="(" separator="," close=")">
+                    #{i}
+                </foreach>
+            </if>
+            <if test=" userId != 1">
+                and a.user_id = #{userId}
+            </if>
+            <if test=" ruleType != -1">
+                and a.rule_type = #{ruleType}
+            </if>
+        </where>
+        order by a.update_time desc
+    </select>
+
+    <select id="getExecuteResultById" resultType="org.apache.dolphinscheduler.dao.entity.DqExecuteResult">
+        SELECT a.*, b.name as process_definition_name, e.name as process_instance_name, c.name as task_name,
+        cp.type as comparison_type_name, d.user_name
+        FROM t_ds_dq_execute_result a
+        left join (select id,name from t_ds_process_definition) b on a.process_definition_id = b.id

Review comment:
       the same question

##########
File path: dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/DataQualityApplication.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality;
+
+import static org.apache.dolphinscheduler.data.quality.Constants.SPARK_APP_NAME;
+
+import org.apache.dolphinscheduler.data.quality.config.Config;
+import org.apache.dolphinscheduler.data.quality.config.DataQualityConfiguration;
+import org.apache.dolphinscheduler.data.quality.config.EnvConfig;
+import org.apache.dolphinscheduler.data.quality.context.DataQualityContext;
+import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment;
+import org.apache.dolphinscheduler.data.quality.utils.JsonUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+/**
+ * DataQualityApplication
+ */
+public class DataQualityApplication {

Review comment:
       can you add some detail comments for `DataQualityApplication`?

##########
File path: dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
##########
@@ -994,6 +994,784 @@ CREATE TABLE `t_ds_alert_plugin_instance` (
   PRIMARY KEY (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
+--
+-- Table structure for table `t_ds_dq_comparison_type`
+--
+DROP TABLE IF EXISTS `t_ds_dq_comparison_type`;
+CREATE TABLE `t_ds_dq_comparison_type` (
+    `id` int(11) NOT NULL AUTO_INCREMENT,
+    `type` varchar(100) NOT NULL,
+    `execute_sql` text DEFAULT NULL,
+    `output_table` varchar(100) DEFAULT NULL,
+    `name` varchar(100) DEFAULT NULL,
+    `create_time` datetime DEFAULT NULL,
+    `update_time` datetime DEFAULT NULL,
+    `is_inner_source` tinyint(1) DEFAULT '0',
+    PRIMARY KEY (`id`)
+)ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+INSERT INTO `t_ds_dq_comparison_type`
+(`id`, `type`, `execute_sql`, `output_table`, `name`, `create_time`, `update_time`, `is_inner_source`)
+VALUES(1, 'FixValue', NULL, NULL, NULL, '2021-06-30 00:00:00.000', '2021-06-30 00:00:00.000', false);
+INSERT INTO `t_ds_dq_comparison_type`
+(`id`, `type`, `execute_sql`, `output_table`, `name`, `create_time`, `update_time`, `is_inner_source`)
+VALUES(2, 'DailyFluctuation', 'select round(avg(statistics_value),2) as day_avg from t_ds_dq_task_statistics_value where data_time >=date_trunc(''DAY'', ${data_time}) and data_time < date_add(date_trunc(''day'', ${data_time}),1) and unique_code = ${unique_code} and statistics_name = ''${statistics_name}''', 'day_range', 'day_range.day_avg', '2021-06-30 00:00:00.000', '2021-06-30 00:00:00.000', true);
+INSERT INTO `t_ds_dq_comparison_type`
+(`id`, `type`, `execute_sql`, `output_table`, `name`, `create_time`, `update_time`, `is_inner_source`)
+VALUES(3, 'WeeklyFluctuation', 'select round(avg(statistics_value),2) as week_avg from t_ds_dq_task_statistics_value where  data_time >= date_trunc(''WEEK'', ${data_time}) and data_time <date_trunc(''day'', ${data_time}) and unique_code = ${unique_code} and statistics_name = ''${statistics_name}''', 'week_range', 'week_range.week_avg', '2021-06-30 00:00:00.000', '2021-06-30 00:00:00.000', true);
+INSERT INTO `t_ds_dq_comparison_type`
+(`id`, `type`, `execute_sql`, `output_table`, `name`, `create_time`, `update_time`, `is_inner_source`)
+VALUES(4, 'MonthlyFluctuation', 'select round(avg(statistics_value),2) as month_avg from t_ds_dq_task_statistics_value where  data_time >= date_trunc(''MONTH'', ${data_time}) and data_time <date_trunc(''day'', ${data_time}) and unique_code = ${unique_code} and statistics_name = ''${statistics_name}''', 'month_range', 'month_range.month_avg', '2021-06-30 00:00:00.000', '2021-06-30 00:00:00.000', true);
+INSERT INTO `t_ds_dq_comparison_type`
+(`id`, `type`, `execute_sql`, `output_table`, `name`, `create_time`, `update_time`, `is_inner_source`)
+VALUES(5, 'Last7DayFluctuation', 'select round(avg(statistics_value),2) as last_7_avg from t_ds_dq_task_statistics_value where  data_time >= date_add(date_trunc(''day'', ${data_time}),-7) and  data_time <date_trunc(''day'', ${data_time}) and unique_code = ${unique_code} and statistics_name = ''${statistics_name}''', 'last_seven_days', 'last_seven_days.last_7_avg', '2021-06-30 00:00:00.000', '2021-06-30 00:00:00.000', true);
+INSERT INTO `t_ds_dq_comparison_type`
+(`id`, `type`, `execute_sql`, `output_table`, `name`, `create_time`, `update_time`, `is_inner_source`)
+VALUES(6, 'Last30DayFluctuation', 'select round(avg(statistics_value),2) as last_30_avg from t_ds_dq_task_statistics_value where  data_time >= date_add(date_trunc(''day'', ${data_time}),-30) and  data_time < date_trunc(''day'', ${data_time}) and unique_code = ${unique_code} and statistics_name = ''${statistics_name}''', 'last_thirty_days', 'last_thirty_days.last_30_avg', '2021-06-30 00:00:00.000', '2021-06-30 00:00:00.000', true);
+INSERT INTO `t_ds_dq_comparison_type`
+(`id`, `type`, `execute_sql`, `output_table`, `name`, `create_time`, `update_time`, `is_inner_source`)
+VALUES(7, 'SrcTableTotalRows', 'SELECT COUNT(*) AS total FROM ${src_table} WHERE (${src_filter})', 'total_count', 'total_count.total', '2021-06-30 00:00:00.000', '2021-06-30 00:00:00.000', false);
+INSERT INTO `t_ds_dq_comparison_type`
+(`id`, `type`, `execute_sql`, `output_table`, `name`, `create_time`, `update_time`, `is_inner_source`)
+VALUES(8, 'TargetTableTotalRows', 'SELECT COUNT(*) AS total FROM ${target_table} WHERE (${target_filter})', 'total_count', 'total_count.total', '2021-06-30 00:00:00.000', '2021-06-30 00:00:00.000', false);
+
+--
+-- Table structure for table `t_ds_dq_execute_result`
+--
+DROP TABLE IF EXISTS `t_ds_dq_execute_result`;
+CREATE TABLE `t_ds_dq_execute_result` (
+    `id` int(11) NOT NULL AUTO_INCREMENT,
+    `process_definition_id` int(11) DEFAULT NULL,
+    `process_instance_id` int(11) DEFAULT NULL,
+    `task_instance_id` int(11) DEFAULT NULL,
+    `rule_type` int(11) DEFAULT NULL,
+    `rule_name` varchar(255) DEFAULT NULL,
+    `statistics_value` double DEFAULT NULL,
+    `comparison_value` double DEFAULT NULL,
+    `check_type` int(11) DEFAULT NULL,
+    `threshold` double DEFAULT NULL,
+    `operator` int(11) DEFAULT NULL,
+    `failure_strategy` int(11) DEFAULT NULL,
+    `state` int(11) DEFAULT NULL,
+    `user_id` int(11) DEFAULT NULL,
+    `comparison_type` int(11) DEFAULT NULL,
+    `error_output_path` text DEFAULT NULL,
+    `create_time` datetime DEFAULT NULL,
+    `update_time` datetime DEFAULT NULL,
+    PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+--
+-- Table structure for table t_ds_dq_rule
+--
+DROP TABLE IF EXISTS `t_ds_dq_rule`;
+CREATE TABLE `t_ds_dq_rule` (
+    `id` int(11) NOT NULL AUTO_INCREMENT,
+    `name` varchar(100) DEFAULT NULL,
+    `type` int(11) DEFAULT NULL,
+    `user_id` int(11) DEFAULT NULL,
+    `create_time` datetime DEFAULT NULL,
+    `update_time` datetime DEFAULT NULL,
+    PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+INSERT INTO `t_ds_dq_rule`
+(`id`, `name`, `type`, `user_id`, `create_time`, `update_time`)
+VALUES(1, '$t(null_check)', 0, 1, '2020-01-12 00:00:00.000', '2020-01-12 00:00:00.000');

Review comment:
       is it necessary to assign id?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org