You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/10/13 10:30:54 UTC
incubator-griffin git commit: [GRIFFIN-205] accuracy matched fraction
Repository: incubator-griffin
Updated Branches:
refs/heads/master cc35024a6 -> 791c502da
[GRIFFIN-205] accuracy matched fraction
https://issues.apache.org/jira/browse/GRIFFIN-205
This pull request covers only batch dq type.
We need to decide is it worth to add "matched fraction" to streaming type.
Accuracy transformation tests added.
Author: ashutak <as...@griddynamics.com>
Closes #434 from ashutakGG/GRIFFIN-205-accuracy-matchedFraction.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/791c502d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/791c502d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/791c502d
Branch: refs/heads/master
Commit: 791c502dab1d30e3758bd85a35dff545b4ab74aa
Parents: cc35024
Author: ashutak <as...@griddynamics.com>
Authored: Sat Oct 13 18:30:48 2018 +0800
Committer: William Guo <gu...@apache.org>
Committed: Sat Oct 13 18:30:48 2018 +0800
----------------------------------------------------------------------
.../configuration/dqdefinition/DQConfig.scala | 16 +-
.../dsl/transform/AccuracyExpr2DQSteps.scala | 17 +-
.../src/test/resources/hive/person_table.csv | 2 +
...AccuracyTransformationsIntegrationTest.scala | 180 +++++++++++++++++++
4 files changed, 202 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/791c502d/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index b281481..a4cdfc1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -74,9 +74,9 @@ case class DQConfig(@JsonProperty("name") private val name: String,
*/
@JsonInclude(Include.NON_NULL)
case class DataSourceParam( @JsonProperty("name") private val name: String,
- @JsonProperty("baseline") private val baseline: Boolean,
@JsonProperty("connectors") private val connectors: List[DataConnectorParam],
- @JsonProperty("checkpoint") private val checkpoint: Map[String, Any]
+ @JsonProperty("baseline") private val baseline: Boolean = false,
+ @JsonProperty("checkpoint") private val checkpoint: Map[String, Any] = null
) extends Param {
def getName: String = name
def isBaseline: Boolean = if (!baseline.equals(null)) baseline else false
@@ -148,12 +148,12 @@ case class EvaluateRuleParam( @JsonProperty("rules") private val rules: List[Rul
@JsonInclude(Include.NON_NULL)
case class RuleParam(@JsonProperty("dsl.type") private val dslType: String,
@JsonProperty("dq.type") private val dqType: String,
- @JsonProperty("in.dataframe.name") private val inDfName: String,
- @JsonProperty("out.dataframe.name") private val outDfName: String,
- @JsonProperty("rule") private val rule: String,
- @JsonProperty("details") private val details: Map[String, Any],
- @JsonProperty("cache") private val cache: Boolean,
- @JsonProperty("out") private val outputs: List[RuleOutputParam]
+ @JsonProperty("in.dataframe.name") private val inDfName: String = null,
+ @JsonProperty("out.dataframe.name") private val outDfName: String = null,
+ @JsonProperty("rule") private val rule: String = null,
+ @JsonProperty("details") private val details: Map[String, Any] = null,
+ @JsonProperty("cache") private val cache: Boolean = false,
+ @JsonProperty("out") private val outputs: List[RuleOutputParam] = null
) extends Param {
def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("")
def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/791c502d/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 3bf7d04..f7ff3ef 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -44,6 +44,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val _miss = "miss"
val _total = "total"
val _matched = "matched"
+ val _matchedFraction = "matchedFraction"
}
import AccuracyKeys._
@@ -125,14 +126,20 @@ case class AccuracyExpr2DQSteps(context: DQContext,
// 4. accuracy metric
val accuracyTableName = ruleParam.getOutDfName()
val matchedColName = details.getStringOrKey(_matched)
+ val matchedFractionColName = details.getStringOrKey(_matchedFraction)
val accuracyMetricSql = procType match {
case BatchProcessType =>
s"""
- |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
- |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
- |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
- |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
- """.stripMargin
+ SELECT A.total AS `${totalColName}`,
+ A.miss AS `${missColName}`,
+ (A.total - A.miss) AS `${matchedColName}`,
+ coalesce( (A.total - A.miss) / A.total, 1.0) AS `${matchedFractionColName}`
+ FROM (
+ SELECT `${totalCountTableName}`.`${totalColName}` AS total,
+ coalesce(`${missCountTableName}`.`${missColName}`, 0) AS miss
+ FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
+ ) AS A
+ """
case StreamingProcessType =>
s"""
|SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`,
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/791c502d/measure/src/test/resources/hive/person_table.csv
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/hive/person_table.csv b/measure/src/test/resources/hive/person_table.csv
new file mode 100644
index 0000000..bde6b3e
--- /dev/null
+++ b/measure/src/test/resources/hive/person_table.csv
@@ -0,0 +1,2 @@
+Joey,14
+Ivan,32
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/791c502d/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
new file mode 100644
index 0000000..129f0c5
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
@@ -0,0 +1,180 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.transformations
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.spark.sql.DataFrame
+import org.scalatest._
+
+import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import org.apache.griffin.measure.context.{ContextId, DQContext}
+import org.apache.griffin.measure.datasource.DataSourceFactory
+import org.apache.griffin.measure.job.builder.DQJobBuilder
+
+case class AccuracyResult(total: Long, miss: Long, matched: Long, matchedFraction: Double)
+
+class AccuracyTransformationsIntegrationTest extends FlatSpec with Matchers with DataFrameSuiteBase {
+ import spark.implicits._
+
+ private val EMPTY_PERSON_TABLE = "empty_person"
+ private val PERSON_TABLE = "person"
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ dropTables()
+ createPersonTable()
+ createEmptyPersonTable()
+
+ spark.conf.set("spark.sql.crossJoin.enabled", "true")
+ }
+
+ override def afterAll(): Unit = {
+ dropTables()
+ super.afterAll()
+ }
+
+ "accuracy" should "basically work" in {
+ checkAccuracy(
+ sourceName = PERSON_TABLE,
+ targetName = PERSON_TABLE,
+ expectedResult = AccuracyResult(total = 2, miss = 0, matched = 2, matchedFraction = 1.0))
+ }
+
+ "accuracy" should "work with empty target" in {
+ checkAccuracy(
+ sourceName = PERSON_TABLE,
+ targetName = EMPTY_PERSON_TABLE,
+ expectedResult = AccuracyResult(total = 2, miss = 2, matched = 0, matchedFraction = 0.0)
+ )
+ }
+
+ "accuracy" should "work with empty source" in {
+ checkAccuracy(
+ sourceName = EMPTY_PERSON_TABLE,
+ targetName = PERSON_TABLE,
+ expectedResult = AccuracyResult(total = 0, miss = 0, matched = 0, matchedFraction = 1.0))
+ }
+
+ "accuracy" should "work with empty source and target" in {
+ checkAccuracy(
+ sourceName = EMPTY_PERSON_TABLE,
+ targetName = EMPTY_PERSON_TABLE,
+ expectedResult = AccuracyResult(total = 0, miss = 0, matched = 0, matchedFraction = 1.0))
+ }
+
+ private def checkAccuracy(sourceName: String, targetName: String, expectedResult: AccuracyResult) = {
+ val dqContext: DQContext = getDqContext(
+ dataSourcesParam = List(
+ DataSourceParam(
+ name = "source",
+ connectors = List(dataConnectorParam(tableName = sourceName))
+ ),
+ DataSourceParam(
+ name = "target",
+ connectors = List(dataConnectorParam(tableName = targetName))
+ )
+ ))
+
+ val accuracyRule = RuleParam(
+ dslType = "griffin-dsl",
+ dqType = "ACCURACY",
+ outDfName = "person_accuracy",
+ rule = "source.name = target.name"
+ )
+
+ val res = getRuleResults(dqContext, accuracyRule)
+ .as[AccuracyResult]
+ .collect()
+
+ res.length shouldBe 1
+
+ res(0) shouldEqual expectedResult
+ }
+
+ private def getRuleResults(dqContext: DQContext, rule: RuleParam): DataFrame = {
+ val dqJob = DQJobBuilder.buildDQJob(
+ dqContext,
+ evaluateRuleParam = EvaluateRuleParam(List(rule))
+ )
+
+ dqJob.execute(dqContext)
+
+ spark.sql(s"select * from ${rule.getOutDfName()}")
+ }
+
+ private def createPersonTable(): Unit = {
+ val personCsvPath = getClass.getResource("/hive/person_table.csv").getFile
+
+ spark.sql(
+ s"CREATE TABLE ${PERSON_TABLE} " +
+ "( " +
+ " name String," +
+ " age int " +
+ ") " +
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' " +
+ "STORED AS TEXTFILE"
+ )
+
+ spark.sql(s"LOAD DATA LOCAL INPATH '$personCsvPath' OVERWRITE INTO TABLE ${PERSON_TABLE}")
+ }
+
+ private def createEmptyPersonTable(): Unit = {
+ spark.sql(
+ s"CREATE TABLE ${EMPTY_PERSON_TABLE} " +
+ "( " +
+ " name String," +
+ " age int " +
+ ") " +
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' " +
+ "STORED AS TEXTFILE"
+ )
+
+ spark.sql(s"select * from ${EMPTY_PERSON_TABLE}").show()
+ }
+
+ private def dropTables(): Unit = {
+ spark.sql(s"DROP TABLE IF EXISTS ${PERSON_TABLE} ")
+ spark.sql(s"DROP TABLE IF EXISTS ${EMPTY_PERSON_TABLE} ")
+ }
+
+ private def getDqContext(dataSourcesParam: Seq[DataSourceParam], name: String = "test-context"): DQContext = {
+ val dataSources = DataSourceFactory.getDataSources(spark, null, dataSourcesParam)
+ dataSources.foreach(_.init())
+
+ DQContext(
+ ContextId(System.currentTimeMillis),
+ name,
+ dataSources,
+ Nil,
+ BatchProcessType
+ )(spark)
+ }
+
+ private def dataConnectorParam(tableName: String) = {
+ DataConnectorParam(
+ conType = "HIVE",
+ version = null,
+ dataFrameName = null,
+ config = Map("table.name" -> tableName),
+ preProc = null
+ )
+ }
+}