You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by li...@apache.org on 2018/04/26 08:12:34 UTC

[11/50] [abbrv] incubator-griffin git commit: completeness streaming pass

completeness streaming pass


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/18bbf241
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/18bbf241
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/18bbf241

Branch: refs/heads/griffin-0.2.0-incubating-rc4
Commit: 18bbf241f2988a942b228a5f971d389ebd47f84b
Parents: 2e77168
Author: Lionel Liu <bh...@163.com>
Authored: Tue Apr 10 12:52:48 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Tue Apr 10 12:52:48 2018 +0800

----------------------------------------------------------------------
 .../measure/process/engine/SparkSqlEngine.scala |  4 +-
 .../rule/trans/AccuracyRulePlanTrans.scala      |  4 +-
 .../rule/trans/CompletenessRulePlanTrans.scala  |  8 +--
 .../_completeness-streaming-griffindsl.json     | 64 ++++++++++++++++++++
 4 files changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18bbf241/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
index 438595b..ce85e7a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
@@ -47,8 +47,8 @@ case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine {
             }
           } else sqlContext.sql(rule)
 
-//          println(name)
-//          rdf.show(3)
+          println(name)
+          rdf.show(30)
 
           if (rs.isGlobal) {
             if (rs.needCache) DataFrameCaches.cacheGlobalDataFrame(name, rdf)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18bbf241/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
index 904b087..ec746d2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
@@ -119,7 +119,7 @@ case class AccuracyRulePlanTrans(dataSourceNames: Seq[String],
           s"""
              |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
              |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
-             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+             |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
              |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
          """.stripMargin
         }
@@ -128,7 +128,7 @@ case class AccuracyRulePlanTrans(dataSourceNames: Seq[String],
              |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
              |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
              |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
-             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+             |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
              |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
              |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}`
          """.stripMargin

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18bbf241/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
index 5b1a893..1b89587 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
@@ -89,11 +89,11 @@ case class CompletenessRulePlanTrans(dataSourceNames: Seq[String],
       val incompleteRecordExport = genRecordExport(recordParam, incompleteRecordsTableName, incompleteRecordsTableName, ct, mode)
 
       // 3. incomplete count
-      val incompleteCountTableName = "__missCount"
+      val incompleteCountTableName = "__incompleteCount"
       val incompleteColName = details.getStringOrKey(_incomplete)
       val incompleteCountSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`"
-        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${incompleteCountTableName}` FROM `${incompleteRecordsTableName}` GROUP BY `${InternalColumns.tmst}`"
+        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP BY `${InternalColumns.tmst}`"
       }
       val incompleteCountStep = SparkSqlStep(incompleteCountTableName, incompleteCountSql, emptyMap)
 
@@ -114,7 +114,7 @@ case class CompletenessRulePlanTrans(dataSourceNames: Seq[String],
           s"""
              |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
              |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`,
-             |(`${totalColName}` - `${incompleteColName}`) AS `${completeColName}`
+             |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}`
              |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}`
          """.stripMargin
         }
@@ -123,7 +123,7 @@ case class CompletenessRulePlanTrans(dataSourceNames: Seq[String],
              |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
              |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
              |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`,
-             |(`${totalColName}` - `${incompleteColName}`) AS `${completeColName}`
+             |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}`
              |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}`
              |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${incompleteCountTableName}`.`${InternalColumns.tmst}`
          """.stripMargin

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18bbf241/measure/src/test/resources/_completeness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json
new file mode 100644
index 0000000..df1b889
--- /dev/null
+++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json
@@ -0,0 +1,64 @@
+{
+  "name": "comp_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.147.177.107:9092",
+              "group.id": "source",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "test",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+        "info.path": "old",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "completeness",
+        "name": "comp",
+        "rule": "name, age",
+        "metric": {
+          "name": "comp"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file