You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/08/14 22:06:30 UTC

[griffin] branch master updated: [GRIFFIN-276] Add batch type measure job test case

This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new dff8a62  [GRIFFIN-276] Add batch type measure job test case
dff8a62 is described below

commit dff8a6298b8796423c5dc73a8b8d741de4bcac11
Author: wankunde <wa...@163.com>
AuthorDate: Thu Aug 15 06:06:13 2019 +0800

    [GRIFFIN-276] Add batch type measure job test case
    
    Add some test case for batch type measure job, which is helpful for checking the correctness of the code and troublethooting.
    
    Author: wankunde <wa...@163.com>
    
    Closes #520 from wankunde/batchJobTest.
---
 .../org/apache/griffin/measure/Application.scala   |   2 +-
 .../org/apache/griffin/measure/launch/DQApp.scala  |   5 +-
 .../griffin/measure/launch/batch/BatchDQApp.scala  |   7 +-
 .../measure/launch/streaming/StreamingDQApp.scala  |  13 +-
 .../griffin/measure/job/BatchDQAppTest.scala       | 155 +++++++++++++++++++++
 .../org/apache/griffin/measure/job/DQAppTest.scala |  70 ++++++++++
 6 files changed, 238 insertions(+), 14 deletions(-)

diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index 99c6a25..0edeed6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -118,7 +118,7 @@ object Application extends Loggable {
     }
   }
 
-  private def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = {
+  def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = {
     val paramReader = ParamReaderFactory.getParamReader(file)
     paramReader.readConfig[T]
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index 71ba89d..c57ebff 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -20,10 +20,11 @@ package org.apache.griffin.measure.launch
 
 import scala.util.Try
 
+import org.apache.spark.sql.SparkSession
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, SinkParam}
 
-
 /**
   * dq application process
   */
@@ -32,6 +33,8 @@ trait DQApp extends Loggable with Serializable {
   val envParam: EnvConfig
   val dqParam: DQConfig
 
+  implicit var sparkSession: SparkSession = _
+
   def init: Try[_]
 
   /**
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index 4c8e7b9..c05d043 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -41,13 +41,10 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
 
   val sparkParam = envParam.getSparkParam
   val metricName = dqParam.getName
-//  val dataSourceParams = dqParam.dataSources
-//  val dataSourceNames = dataSourceParams.map(_.name)
   val sinkParams = getSinkParams
 
   var sqlContext: SQLContext = _
-
-  implicit var sparkSession: SparkSession = _
+  var dqContext: DQContext = _
 
   def retryable: Boolean = false
 
@@ -78,7 +75,7 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
     dataSources.foreach(_.init)
 
     // create dq context
-    val dqContext: DQContext = DQContext(
+    dqContext = DQContext(
       contextId, metricName, dataSources, sinkParams, BatchProcessType
     )(sparkSession)
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index e80ac61..be32eba 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -34,12 +34,12 @@ import org.apache.griffin.measure.context._
 import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
 import org.apache.griffin.measure.context.streaming.metric.CacheResults
 import org.apache.griffin.measure.datasource.DataSourceFactory
+import org.apache.griffin.measure.job.DQJob
 import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
 import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
 
-
 case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
 
   val envParam: EnvConfig = allParam.getEnvConfig
@@ -47,14 +47,10 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
 
   val sparkParam = envParam.getSparkParam
   val metricName = dqParam.getName
-//  val dataSourceParams = dqParam.dataSources
-//  val dataSourceNames = dataSourceParams.map(_.name)
   val sinkParams = getSinkParams
 
   var sqlContext: SQLContext = _
 
-  implicit var sparkSession: SparkSession = _
-
   def retryable: Boolean = true
 
   def init: Try[_] = Try {
@@ -170,6 +166,9 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
     val lock = OffsetCheckpointClient.genLock("process")
     val appSink = globalContext.getSink()
 
+    var dqContext: DQContext = _
+    var dqJob: DQJob = _
+
     def run(): Unit = {
       val updateTimeDate = new Date()
       val updateTime = updateTimeDate.getTime
@@ -185,10 +184,10 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
           val contextId = ContextId(startTime)
 
           // create dq context
-          val dqContext: DQContext = globalContext.cloneDQContext(contextId)
+          dqContext = globalContext.cloneDQContext(contextId)
 
           // build job
-          val dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)
+          dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)
 
           // dq job execute
           dqJob.execute(dqContext)
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
new file mode 100644
index 0000000..053001a
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -0,0 +1,155 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.job
+
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.griffin.measure.Application.readParamFile
+import org.apache.griffin.measure.configuration.dqdefinition.EnvConfig
+import org.apache.griffin.measure.launch.batch.BatchDQApp
+import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
+
+class BatchDQAppTest extends DQAppTest with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    envParam = readParamFile[EnvConfig](getConfigFilePath("/env-batch.json")) match {
+      case Success(p) => p
+      case Failure(ex) =>
+        error(ex.getMessage, ex)
+        sys.exit(-2)
+    }
+
+    sparkParam = envParam.getSparkParam
+
+    Try {
+      // build spark 2.0+ application context
+      var conf = new SparkConf().setAppName("BatchDQApp Test")
+      conf.setAll(sparkParam.getConfig)
+      conf.set("spark.sql.crossJoin.enabled", "true")
+
+      sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
+      val logLevel = getGriffinLogLevel()
+      sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
+      griffinLogger.setLevel(logLevel)
+      val sqlContext = sparkSession.sqlContext
+
+      // register udf
+      GriffinUDFAgent.register(sqlContext)
+    }
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    sparkSession.stop()
+  }
+
+  def runAndCheckResult(metrics: Map[String, Any]): Unit = {
+    val runResult = dqApp.run
+    assert(runResult.isSuccess)
+    assert(runResult.get)
+
+    // check Result Metrics
+    val dqContext = dqApp.asInstanceOf[BatchDQApp].dqContext
+    val timestamp = dqContext.contextId.timestamp
+    val expectedMetrics =
+      Map(timestamp -> metrics)
+
+    dqContext.metricWrapper.metrics should equal(expectedMetrics)
+  }
+
+  "accuracy batch job" should "work" in {
+    dqApp = initApp("/_accuracy-batch-griffindsl.json")
+    val expectedMetrics = Map("total_count" -> 50,
+      "miss_count" -> 4,
+      "matched_count" -> 46,
+      "matchedFraction" -> 0.92)
+
+    runAndCheckResult(expectedMetrics)
+  }
+
+  "completeness batch job" should "work" in {
+    dqApp = initApp("/_completeness-batch-griffindsl.json")
+    val expectedMetrics = Map("total" -> 50,
+      "incomplete" -> 1,
+      "complete" -> 49)
+
+    runAndCheckResult(expectedMetrics)
+  }
+
+  "distinctness batch job" should "work" in {
+    dqApp = initApp("/_distinctness-batch-griffindsl.json")
+
+    val expectedMetrics = Map("total" -> 50,
+      "distinct" -> 49,
+      "dup" -> Seq(Map("dup" -> 1, "num" -> 1)))
+
+    runAndCheckResult(expectedMetrics)
+  }
+
+  "profiling batch job" should "work" in {
+    dqApp = initApp("/_profiling-batch-griffindsl.json")
+    val expectedMetrics = Map(
+      "prof" -> Seq(Map("user_id" -> 10004, "cnt" -> 1),
+        Map("user_id" -> 10011, "cnt" -> 1),
+        Map("user_id" -> 10010, "cnt" -> 1),
+        Map("user_id" -> 10002, "cnt" -> 1),
+        Map("user_id" -> 10006, "cnt" -> 1),
+        Map("user_id" -> 10001, "cnt" -> 1),
+        Map("user_id" -> 10005, "cnt" -> 1),
+        Map("user_id" -> 10008, "cnt" -> 1),
+        Map("user_id" -> 10013, "cnt" -> 1),
+        Map("user_id" -> 10003, "cnt" -> 1),
+        Map("user_id" -> 10007, "cnt" -> 1),
+        Map("user_id" -> 10012, "cnt" -> 1),
+        Map("user_id" -> 10009, "cnt" -> 1)
+      ),
+      "post_group" -> Seq(Map("post_code" -> "94022", "cnt" -> 13))
+    )
+
+    runAndCheckResult(expectedMetrics)
+  }
+
+  "timeliness batch job" should "work" in {
+    dqApp = initApp("/_timeliness-batch-griffindsl.json")
+    val expectedMetrics = Map("total" -> 10,
+      "avg" -> 276000,
+      "percentile_95" -> 660000,
+      "step" -> Seq(Map("step" -> 0, "cnt" -> 6),
+        Map("step" -> 5, "cnt" -> 2),
+        Map("step" -> 3, "cnt" -> 1),
+        Map("step" -> 4, "cnt" -> 1)
+      )
+    )
+
+    runAndCheckResult(expectedMetrics)
+  }
+
+  "uniqueness batch job" should "work" in {
+    dqApp = initApp("/_uniqueness-batch-griffindsl.json")
+    val expectedMetrics = Map("total" -> 50, "unique" -> 48)
+
+    runAndCheckResult(expectedMetrics)
+  }
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
new file mode 100644
index 0000000..ce38408
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -0,0 +1,70 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.job
+
+import scala.util.{Failure, Success}
+
+import org.apache.spark.sql.SparkSession
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.griffin.measure.Application._
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.launch.DQApp
+import org.apache.griffin.measure.launch.batch.BatchDQApp
+import org.apache.griffin.measure.launch.streaming.StreamingDQApp
+
+class DQAppTest extends FlatSpec with Matchers with Loggable {
+
+  var envParam: EnvConfig = _
+  var sparkParam: SparkParam = _
+  var sparkSession: SparkSession = _
+
+  var dqApp: DQApp = _
+
+  def getConfigFilePath(fileName: String): String = {
+    getClass.getResource(fileName).getFile
+  }
+
+  def initApp(dqParamFile: String): DQApp = {
+    val dqParam = readParamFile[DQConfig](getConfigFilePath(dqParamFile)) match {
+      case Success(p) => p
+      case Failure(ex) =>
+        error(ex.getMessage, ex)
+        sys.exit(-2)
+    }
+
+    val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
+
+    // choose process
+    val procType = ProcessType(allParam.getDqConfig.getProcType)
+    dqApp = procType match {
+      case BatchProcessType => new BatchDQApp(allParam)
+      case StreamingProcessType => StreamingDQApp(allParam)
+      case _ =>
+        error(s"${procType} is unsupported process type!")
+        sys.exit(-4)
+    }
+
+    dqApp.sparkSession = sparkSession
+    dqApp
+  }
+
+}