You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/08/14 22:06:30 UTC
[griffin] branch master updated: [GRIFFIN-276] Add batch type
measure job test case
This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new dff8a62 [GRIFFIN-276] Add batch type measure job test case
dff8a62 is described below
commit dff8a6298b8796423c5dc73a8b8d741de4bcac11
Author: wankunde <wa...@163.com>
AuthorDate: Thu Aug 15 06:06:13 2019 +0800
[GRIFFIN-276] Add batch type measure job test case
Add some test case for batch type measure job, which is helpful for checking the correctness of the code and troublethooting.
Author: wankunde <wa...@163.com>
Closes #520 from wankunde/batchJobTest.
---
.../org/apache/griffin/measure/Application.scala | 2 +-
.../org/apache/griffin/measure/launch/DQApp.scala | 5 +-
.../griffin/measure/launch/batch/BatchDQApp.scala | 7 +-
.../measure/launch/streaming/StreamingDQApp.scala | 13 +-
.../griffin/measure/job/BatchDQAppTest.scala | 155 +++++++++++++++++++++
.../org/apache/griffin/measure/job/DQAppTest.scala | 70 ++++++++++
6 files changed, 238 insertions(+), 14 deletions(-)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index 99c6a25..0edeed6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -118,7 +118,7 @@ object Application extends Loggable {
}
}
- private def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = {
+ def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = {
val paramReader = ParamReaderFactory.getParamReader(file)
paramReader.readConfig[T]
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index 71ba89d..c57ebff 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -20,10 +20,11 @@ package org.apache.griffin.measure.launch
import scala.util.Try
+import org.apache.spark.sql.SparkSession
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, SinkParam}
-
/**
* dq application process
*/
@@ -32,6 +33,8 @@ trait DQApp extends Loggable with Serializable {
val envParam: EnvConfig
val dqParam: DQConfig
+ implicit var sparkSession: SparkSession = _
+
def init: Try[_]
/**
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index 4c8e7b9..c05d043 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -41,13 +41,10 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
val sparkParam = envParam.getSparkParam
val metricName = dqParam.getName
-// val dataSourceParams = dqParam.dataSources
-// val dataSourceNames = dataSourceParams.map(_.name)
val sinkParams = getSinkParams
var sqlContext: SQLContext = _
-
- implicit var sparkSession: SparkSession = _
+ var dqContext: DQContext = _
def retryable: Boolean = false
@@ -78,7 +75,7 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
dataSources.foreach(_.init)
// create dq context
- val dqContext: DQContext = DQContext(
+ dqContext = DQContext(
contextId, metricName, dataSources, sinkParams, BatchProcessType
)(sparkSession)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index e80ac61..be32eba 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -34,12 +34,12 @@ import org.apache.griffin.measure.context._
import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
import org.apache.griffin.measure.context.streaming.metric.CacheResults
import org.apache.griffin.measure.datasource.DataSourceFactory
+import org.apache.griffin.measure.job.DQJob
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-
case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
val envParam: EnvConfig = allParam.getEnvConfig
@@ -47,14 +47,10 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
val sparkParam = envParam.getSparkParam
val metricName = dqParam.getName
-// val dataSourceParams = dqParam.dataSources
-// val dataSourceNames = dataSourceParams.map(_.name)
val sinkParams = getSinkParams
var sqlContext: SQLContext = _
- implicit var sparkSession: SparkSession = _
-
def retryable: Boolean = true
def init: Try[_] = Try {
@@ -170,6 +166,9 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
val lock = OffsetCheckpointClient.genLock("process")
val appSink = globalContext.getSink()
+ var dqContext: DQContext = _
+ var dqJob: DQJob = _
+
def run(): Unit = {
val updateTimeDate = new Date()
val updateTime = updateTimeDate.getTime
@@ -185,10 +184,10 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
val contextId = ContextId(startTime)
// create dq context
- val dqContext: DQContext = globalContext.cloneDQContext(contextId)
+ dqContext = globalContext.cloneDQContext(contextId)
// build job
- val dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)
+ dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)
// dq job execute
dqJob.execute(dqContext)
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
new file mode 100644
index 0000000..053001a
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -0,0 +1,155 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.job
+
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.griffin.measure.Application.readParamFile
+import org.apache.griffin.measure.configuration.dqdefinition.EnvConfig
+import org.apache.griffin.measure.launch.batch.BatchDQApp
+import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
+
+class BatchDQAppTest extends DQAppTest with BeforeAndAfterAll {
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ envParam = readParamFile[EnvConfig](getConfigFilePath("/env-batch.json")) match {
+ case Success(p) => p
+ case Failure(ex) =>
+ error(ex.getMessage, ex)
+ sys.exit(-2)
+ }
+
+ sparkParam = envParam.getSparkParam
+
+ Try {
+ // build spark 2.0+ application context
+ var conf = new SparkConf().setAppName("BatchDQApp Test")
+ conf.setAll(sparkParam.getConfig)
+ conf.set("spark.sql.crossJoin.enabled", "true")
+
+ sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
+ val logLevel = getGriffinLogLevel()
+ sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
+ griffinLogger.setLevel(logLevel)
+ val sqlContext = sparkSession.sqlContext
+
+ // register udf
+ GriffinUDFAgent.register(sqlContext)
+ }
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ sparkSession.stop()
+ }
+
+ def runAndCheckResult(metrics: Map[String, Any]): Unit = {
+ val runResult = dqApp.run
+ assert(runResult.isSuccess)
+ assert(runResult.get)
+
+ // check Result Metrics
+ val dqContext = dqApp.asInstanceOf[BatchDQApp].dqContext
+ val timestamp = dqContext.contextId.timestamp
+ val expectedMetrics =
+ Map(timestamp -> metrics)
+
+ dqContext.metricWrapper.metrics should equal(expectedMetrics)
+ }
+
+ "accuracy batch job" should "work" in {
+ dqApp = initApp("/_accuracy-batch-griffindsl.json")
+ val expectedMetrics = Map("total_count" -> 50,
+ "miss_count" -> 4,
+ "matched_count" -> 46,
+ "matchedFraction" -> 0.92)
+
+ runAndCheckResult(expectedMetrics)
+ }
+
+ "completeness batch job" should "work" in {
+ dqApp = initApp("/_completeness-batch-griffindsl.json")
+ val expectedMetrics = Map("total" -> 50,
+ "incomplete" -> 1,
+ "complete" -> 49)
+
+ runAndCheckResult(expectedMetrics)
+ }
+
+ "distinctness batch job" should "work" in {
+ dqApp = initApp("/_distinctness-batch-griffindsl.json")
+
+ val expectedMetrics = Map("total" -> 50,
+ "distinct" -> 49,
+ "dup" -> Seq(Map("dup" -> 1, "num" -> 1)))
+
+ runAndCheckResult(expectedMetrics)
+ }
+
+ "profiling batch job" should "work" in {
+ dqApp = initApp("/_profiling-batch-griffindsl.json")
+ val expectedMetrics = Map(
+ "prof" -> Seq(Map("user_id" -> 10004, "cnt" -> 1),
+ Map("user_id" -> 10011, "cnt" -> 1),
+ Map("user_id" -> 10010, "cnt" -> 1),
+ Map("user_id" -> 10002, "cnt" -> 1),
+ Map("user_id" -> 10006, "cnt" -> 1),
+ Map("user_id" -> 10001, "cnt" -> 1),
+ Map("user_id" -> 10005, "cnt" -> 1),
+ Map("user_id" -> 10008, "cnt" -> 1),
+ Map("user_id" -> 10013, "cnt" -> 1),
+ Map("user_id" -> 10003, "cnt" -> 1),
+ Map("user_id" -> 10007, "cnt" -> 1),
+ Map("user_id" -> 10012, "cnt" -> 1),
+ Map("user_id" -> 10009, "cnt" -> 1)
+ ),
+ "post_group" -> Seq(Map("post_code" -> "94022", "cnt" -> 13))
+ )
+
+ runAndCheckResult(expectedMetrics)
+ }
+
+ "timeliness batch job" should "work" in {
+ dqApp = initApp("/_timeliness-batch-griffindsl.json")
+ val expectedMetrics = Map("total" -> 10,
+ "avg" -> 276000,
+ "percentile_95" -> 660000,
+ "step" -> Seq(Map("step" -> 0, "cnt" -> 6),
+ Map("step" -> 5, "cnt" -> 2),
+ Map("step" -> 3, "cnt" -> 1),
+ Map("step" -> 4, "cnt" -> 1)
+ )
+ )
+
+ runAndCheckResult(expectedMetrics)
+ }
+
+ "uniqueness batch job" should "work" in {
+ dqApp = initApp("/_uniqueness-batch-griffindsl.json")
+ val expectedMetrics = Map("total" -> 50, "unique" -> 48)
+
+ runAndCheckResult(expectedMetrics)
+ }
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
new file mode 100644
index 0000000..ce38408
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -0,0 +1,70 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.job
+
+import scala.util.{Failure, Success}
+
+import org.apache.spark.sql.SparkSession
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.griffin.measure.Application._
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.launch.DQApp
+import org.apache.griffin.measure.launch.batch.BatchDQApp
+import org.apache.griffin.measure.launch.streaming.StreamingDQApp
+
+class DQAppTest extends FlatSpec with Matchers with Loggable {
+
+ var envParam: EnvConfig = _
+ var sparkParam: SparkParam = _
+ var sparkSession: SparkSession = _
+
+ var dqApp: DQApp = _
+
+ def getConfigFilePath(fileName: String): String = {
+ getClass.getResource(fileName).getFile
+ }
+
+ def initApp(dqParamFile: String): DQApp = {
+ val dqParam = readParamFile[DQConfig](getConfigFilePath(dqParamFile)) match {
+ case Success(p) => p
+ case Failure(ex) =>
+ error(ex.getMessage, ex)
+ sys.exit(-2)
+ }
+
+ val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
+
+ // choose process
+ val procType = ProcessType(allParam.getDqConfig.getProcType)
+ dqApp = procType match {
+ case BatchProcessType => new BatchDQApp(allParam)
+ case StreamingProcessType => StreamingDQApp(allParam)
+ case _ =>
+ error(s"${procType} is unsupported process type!")
+ sys.exit(-4)
+ }
+
+ dqApp.sparkSession = sparkSession
+ dqApp
+ }
+
+}