You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/10/13 10:30:54 UTC

incubator-griffin git commit: [GRIFFIN-205] accuracy matched fraction

Repository: incubator-griffin
Updated Branches:
  refs/heads/master cc35024a6 -> 791c502da


[GRIFFIN-205] accuracy matched fraction

https://issues.apache.org/jira/browse/GRIFFIN-205

This pull request covers only batch dq type.
We need to decide is it worth to add "matched fraction" to streaming type.

Accuracy transformation tests added.

Author: ashutak <as...@griddynamics.com>

Closes #434 from ashutakGG/GRIFFIN-205-accuracy-matchedFraction.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/791c502d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/791c502d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/791c502d

Branch: refs/heads/master
Commit: 791c502dab1d30e3758bd85a35dff545b4ab74aa
Parents: cc35024
Author: ashutak <as...@griddynamics.com>
Authored: Sat Oct 13 18:30:48 2018 +0800
Committer: William Guo <gu...@apache.org>
Committed: Sat Oct 13 18:30:48 2018 +0800

----------------------------------------------------------------------
 .../configuration/dqdefinition/DQConfig.scala   |  16 +-
 .../dsl/transform/AccuracyExpr2DQSteps.scala    |  17 +-
 .../src/test/resources/hive/person_table.csv    |   2 +
 ...AccuracyTransformationsIntegrationTest.scala | 180 +++++++++++++++++++
 4 files changed, 202 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/791c502d/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index b281481..a4cdfc1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -74,9 +74,9 @@ case class DQConfig(@JsonProperty("name") private val name: String,
   */
 @JsonInclude(Include.NON_NULL)
 case class DataSourceParam( @JsonProperty("name") private val name: String,
-                            @JsonProperty("baseline") private val baseline: Boolean,
                             @JsonProperty("connectors") private val connectors: List[DataConnectorParam],
-                            @JsonProperty("checkpoint") private val checkpoint: Map[String, Any]
+                            @JsonProperty("baseline") private val baseline: Boolean = false,
+                            @JsonProperty("checkpoint") private val checkpoint: Map[String, Any] = null
                           ) extends Param {
   def getName: String = name
   def isBaseline: Boolean = if (!baseline.equals(null)) baseline else false
@@ -148,12 +148,12 @@ case class EvaluateRuleParam( @JsonProperty("rules") private val rules: List[Rul
 @JsonInclude(Include.NON_NULL)
 case class RuleParam(@JsonProperty("dsl.type") private val dslType: String,
                      @JsonProperty("dq.type") private val dqType: String,
-                     @JsonProperty("in.dataframe.name") private val inDfName: String,
-                     @JsonProperty("out.dataframe.name") private val outDfName: String,
-                     @JsonProperty("rule") private val rule: String,
-                     @JsonProperty("details") private val details: Map[String, Any],
-                     @JsonProperty("cache") private val cache: Boolean,
-                     @JsonProperty("out") private val outputs: List[RuleOutputParam]
+                     @JsonProperty("in.dataframe.name") private val inDfName: String = null,
+                     @JsonProperty("out.dataframe.name") private val outDfName: String = null,
+                     @JsonProperty("rule") private val rule: String = null,
+                     @JsonProperty("details") private val details: Map[String, Any] = null,
+                     @JsonProperty("cache") private val cache: Boolean = false,
+                     @JsonProperty("out") private val outputs: List[RuleOutputParam] = null
                     ) extends Param {
   def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("")
   def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/791c502d/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 3bf7d04..f7ff3ef 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -44,6 +44,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
     val _miss = "miss"
     val _total = "total"
     val _matched = "matched"
+    val _matchedFraction = "matchedFraction"
   }
   import AccuracyKeys._
 
@@ -125,14 +126,20 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       // 4. accuracy metric
       val accuracyTableName = ruleParam.getOutDfName()
       val matchedColName = details.getStringOrKey(_matched)
+      val matchedFractionColName = details.getStringOrKey(_matchedFraction)
       val accuracyMetricSql = procType match {
         case BatchProcessType =>
           s"""
-             |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
-             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
-             |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
-             |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
-         """.stripMargin
+             SELECT A.total AS `${totalColName}`,
+                    A.miss AS `${missColName}`,
+                    (A.total - A.miss) AS `${matchedColName}`,
+                    coalesce( (A.total - A.miss) / A.total, 1.0) AS `${matchedFractionColName}`
+             FROM (
+               SELECT `${totalCountTableName}`.`${totalColName}` AS total,
+                      coalesce(`${missCountTableName}`.`${missColName}`, 0) AS miss
+               FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
+             ) AS A
+         """
         case StreamingProcessType =>
           s"""
              |SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`,

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/791c502d/measure/src/test/resources/hive/person_table.csv
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/hive/person_table.csv b/measure/src/test/resources/hive/person_table.csv
new file mode 100644
index 0000000..bde6b3e
--- /dev/null
+++ b/measure/src/test/resources/hive/person_table.csv
@@ -0,0 +1,2 @@
+Joey,14
+Ivan,32

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/791c502d/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
new file mode 100644
index 0000000..129f0c5
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
@@ -0,0 +1,180 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.transformations
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.spark.sql.DataFrame
+import org.scalatest._
+
+import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import org.apache.griffin.measure.context.{ContextId, DQContext}
+import org.apache.griffin.measure.datasource.DataSourceFactory
+import org.apache.griffin.measure.job.builder.DQJobBuilder
+
+case class AccuracyResult(total: Long, miss: Long, matched: Long, matchedFraction: Double)
+
+class AccuracyTransformationsIntegrationTest extends FlatSpec with Matchers with DataFrameSuiteBase {
+  import spark.implicits._
+
+  private val EMPTY_PERSON_TABLE = "empty_person"
+  private val PERSON_TABLE = "person"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    dropTables()
+    createPersonTable()
+    createEmptyPersonTable()
+
+    spark.conf.set("spark.sql.crossJoin.enabled", "true")
+  }
+
+  override def afterAll(): Unit = {
+    dropTables()
+    super.afterAll()
+  }
+
+  "accuracy" should "basically work" in {
+    checkAccuracy(
+      sourceName = PERSON_TABLE,
+      targetName = PERSON_TABLE,
+      expectedResult = AccuracyResult(total = 2, miss = 0, matched = 2, matchedFraction = 1.0))
+  }
+
+  "accuracy" should "work with empty target" in {
+    checkAccuracy(
+      sourceName = PERSON_TABLE,
+      targetName = EMPTY_PERSON_TABLE,
+      expectedResult = AccuracyResult(total = 2, miss = 2, matched = 0, matchedFraction = 0.0)
+    )
+  }
+
+  "accuracy" should "work with empty source" in {
+    checkAccuracy(
+      sourceName = EMPTY_PERSON_TABLE,
+      targetName = PERSON_TABLE,
+      expectedResult = AccuracyResult(total = 0, miss = 0, matched = 0, matchedFraction = 1.0))
+  }
+
+  "accuracy" should "work with empty source and target" in {
+    checkAccuracy(
+      sourceName = EMPTY_PERSON_TABLE,
+      targetName = EMPTY_PERSON_TABLE,
+      expectedResult = AccuracyResult(total = 0, miss = 0, matched = 0, matchedFraction = 1.0))
+  }
+
+  private def checkAccuracy(sourceName: String, targetName: String, expectedResult: AccuracyResult) = {
+    val dqContext: DQContext = getDqContext(
+      dataSourcesParam = List(
+        DataSourceParam(
+          name = "source",
+          connectors = List(dataConnectorParam(tableName = sourceName))
+        ),
+        DataSourceParam(
+          name = "target",
+          connectors = List(dataConnectorParam(tableName = targetName))
+        )
+      ))
+
+    val accuracyRule = RuleParam(
+      dslType = "griffin-dsl",
+      dqType = "ACCURACY",
+      outDfName = "person_accuracy",
+      rule = "source.name = target.name"
+    )
+
+    val res = getRuleResults(dqContext, accuracyRule)
+      .as[AccuracyResult]
+      .collect()
+
+    res.length shouldBe 1
+
+    res(0) shouldEqual expectedResult
+  }
+
+  private def getRuleResults(dqContext: DQContext, rule: RuleParam): DataFrame = {
+    val dqJob = DQJobBuilder.buildDQJob(
+      dqContext,
+      evaluateRuleParam = EvaluateRuleParam(List(rule))
+    )
+
+    dqJob.execute(dqContext)
+
+    spark.sql(s"select * from ${rule.getOutDfName()}")
+  }
+
+  private def createPersonTable(): Unit = {
+    val personCsvPath = getClass.getResource("/hive/person_table.csv").getFile
+
+    spark.sql(
+      s"CREATE TABLE ${PERSON_TABLE} " +
+        "( " +
+        "  name String," +
+        "  age int " +
+        ") " +
+        "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' " +
+        "STORED AS TEXTFILE"
+    )
+
+    spark.sql(s"LOAD DATA LOCAL INPATH '$personCsvPath' OVERWRITE INTO TABLE ${PERSON_TABLE}")
+  }
+
+  private def createEmptyPersonTable(): Unit = {
+    spark.sql(
+      s"CREATE TABLE ${EMPTY_PERSON_TABLE} " +
+        "( " +
+        "  name String," +
+        "  age int " +
+        ") " +
+        "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' " +
+        "STORED AS TEXTFILE"
+    )
+
+    spark.sql(s"select * from ${EMPTY_PERSON_TABLE}").show()
+  }
+
+  private def dropTables(): Unit = {
+    spark.sql(s"DROP TABLE IF EXISTS ${PERSON_TABLE} ")
+    spark.sql(s"DROP TABLE IF EXISTS ${EMPTY_PERSON_TABLE} ")
+  }
+
+  private def getDqContext(dataSourcesParam: Seq[DataSourceParam], name: String = "test-context"): DQContext = {
+    val dataSources = DataSourceFactory.getDataSources(spark, null, dataSourcesParam)
+    dataSources.foreach(_.init())
+
+    DQContext(
+      ContextId(System.currentTimeMillis),
+      name,
+      dataSources,
+      Nil,
+      BatchProcessType
+    )(spark)
+  }
+
+  private def dataConnectorParam(tableName: String) = {
+    DataConnectorParam(
+      conType = "HIVE",
+      version = null,
+      dataFrameName = null,
+      config = Map("table.name" -> tableName),
+      preProc = null
+    )
+  }
+}