You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by li...@apache.org on 2018/04/26 08:12:34 UTC
[11/50] [abbrv] incubator-griffin git commit: completeness streaming
pass
completeness streaming pass
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/18bbf241
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/18bbf241
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/18bbf241
Branch: refs/heads/griffin-0.2.0-incubating-rc4
Commit: 18bbf241f2988a942b228a5f971d389ebd47f84b
Parents: 2e77168
Author: Lionel Liu <bh...@163.com>
Authored: Tue Apr 10 12:52:48 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Tue Apr 10 12:52:48 2018 +0800
----------------------------------------------------------------------
.../measure/process/engine/SparkSqlEngine.scala | 4 +-
.../rule/trans/AccuracyRulePlanTrans.scala | 4 +-
.../rule/trans/CompletenessRulePlanTrans.scala | 8 +--
.../_completeness-streaming-griffindsl.json | 64 ++++++++++++++++++++
4 files changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18bbf241/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
index 438595b..ce85e7a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
@@ -47,8 +47,8 @@ case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine {
}
} else sqlContext.sql(rule)
-// println(name)
-// rdf.show(3)
+ println(name)
+ rdf.show(30)
if (rs.isGlobal) {
if (rs.needCache) DataFrameCaches.cacheGlobalDataFrame(name, rdf)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18bbf241/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
index 904b087..ec746d2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
@@ -119,7 +119,7 @@ case class AccuracyRulePlanTrans(dataSourceNames: Seq[String],
s"""
|SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
|coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
- |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+ |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
|FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
""".stripMargin
}
@@ -128,7 +128,7 @@ case class AccuracyRulePlanTrans(dataSourceNames: Seq[String],
|SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
|`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
|coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
- |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+ |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
|FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
|ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}`
""".stripMargin
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18bbf241/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
index 5b1a893..1b89587 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
@@ -89,11 +89,11 @@ case class CompletenessRulePlanTrans(dataSourceNames: Seq[String],
val incompleteRecordExport = genRecordExport(recordParam, incompleteRecordsTableName, incompleteRecordsTableName, ct, mode)
// 3. incomplete count
- val incompleteCountTableName = "__missCount"
+ val incompleteCountTableName = "__incompleteCount"
val incompleteColName = details.getStringOrKey(_incomplete)
val incompleteCountSql = procType match {
case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`"
- case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${incompleteCountTableName}` FROM `${incompleteRecordsTableName}` GROUP BY `${InternalColumns.tmst}`"
+ case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP BY `${InternalColumns.tmst}`"
}
val incompleteCountStep = SparkSqlStep(incompleteCountTableName, incompleteCountSql, emptyMap)
@@ -114,7 +114,7 @@ case class CompletenessRulePlanTrans(dataSourceNames: Seq[String],
s"""
|SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
|coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`,
- |(`${totalColName}` - `${incompleteColName}`) AS `${completeColName}`
+ |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}`
|FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}`
""".stripMargin
}
@@ -123,7 +123,7 @@ case class CompletenessRulePlanTrans(dataSourceNames: Seq[String],
|SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
|`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
|coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`,
- |(`${totalColName}` - `${incompleteColName}`) AS `${completeColName}`
+ |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}`
|FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}`
|ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${incompleteCountTableName}`.`${InternalColumns.tmst}`
""".stripMargin
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18bbf241/measure/src/test/resources/_completeness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json
new file mode 100644
index 0000000..df1b889
--- /dev/null
+++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json
@@ -0,0 +1,64 @@
+{
+ "name": "comp_streaming",
+
+ "process.type": "streaming",
+
+ "data.sources": [
+ {
+ "name": "source",
+ "connectors": [
+ {
+ "type": "kafka",
+ "version": "0.8",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.147.177.107:9092",
+ "group.id": "source",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
+ },
+ "topics": "test",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-opr",
+ "name": "${s1}",
+ "rule": "from_json",
+ "details": {
+ "df.name": "${this}"
+ }
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "${this}",
+ "rule": "select name, age from ${s1}"
+ }
+ ]
+ }
+ ],
+ "cache": {
+ "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+ "info.path": "old",
+ "ready.time.interval": "10s",
+ "ready.time.delay": "0",
+ "time.range": ["0", "0"]
+ }
+ }
+ ],
+
+ "evaluate.rule": {
+ "rules": [
+ {
+ "dsl.type": "griffin-dsl",
+ "dq.type": "completeness",
+ "name": "comp",
+ "rule": "name, age",
+ "metric": {
+ "name": "comp"
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file