You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/06/02 06:10:42 UTC
incubator-griffin git commit: [GRIFFIN-26] support profiling
measurement
Repository: incubator-griffin
Updated Branches:
refs/heads/master ce9e5d399 -> 1cca069dd
[GRIFFIN-26] support profiling measurement
Author: Liu <ll...@ebay.com>
Author: Liu <ll...@lm-shc-16501428.corp.ebay.com>
Author: Lionel Liu <bh...@163.com>
Author: Liu <ll...@lm-shc-16501428.dhcp>
Author: bhlx3lyx7 <bh...@163.com>
Closes #48 from bhlx3lyx7/master.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/1cca069d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/1cca069d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/1cca069d
Branch: refs/heads/master
Commit: 1cca069ddf1b3d889f2ca3ccf4c686e04413f608
Parents: ce9e5d3
Author: Liu <ll...@ebay.com>
Authored: Fri Jun 2 14:10:29 2017 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Fri Jun 2 14:10:29 2017 +0800
----------------------------------------------------------------------
README.md | 8 +-
.../griffin/measure/batch/Application.scala | 5 +-
.../measure/batch/algo/BatchProfileAlgo.scala | 137 +++++++++++++++++
.../measure/batch/algo/MeasureType.scala | 8 +
.../measure/batch/algo/ProfileAlgo.scala | 5 +
.../measure/batch/algo/core/ProfileCore.scala | 53 +++++++
.../measure/batch/persist/HdfsPersist.scala | 18 ++-
.../measure/batch/persist/HttpPersist.scala | 40 +++--
.../measure/batch/persist/MultiPersists.scala | 1 +
.../griffin/measure/batch/persist/Persist.scala | 1 +
.../measure/batch/result/ProfileResult.scala | 26 ++++
.../measure/batch/rule/CalculationUtil.scala | 45 ++++--
.../griffin/measure/batch/rule/RuleParser.scala | 5 +-
.../measure/batch/rule/expr/LiteralExpr.scala | 7 +-
.../src/test/resources/config-profile.json | 17 +++
.../batch/algo/BatchProfileAlgoTest.scala | 149 +++++++++++++++++++
.../main/resources/application-dev.properties | 22 +++
.../src/main/resources/application.properties | 4 +-
18 files changed, 513 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 1724e18..dceefda 100644
--- a/README.md
+++ b/README.md
@@ -21,14 +21,18 @@ Release:
```
docker pull bhlx3lyx7/griffin_demo:0.0.1
```
-3. Run this docker image, then griffin is ready.
+3. Increase vm.max_map_count of your local machine, to use elasticsearch.
+ ```
+ sysctl -w vm.max_map_count=262144
+ ```
+4. Run this docker image, then griffin is ready.
```
docker run -it -h sandbox --name griffin_demo -m 8G --memory-swap -1 \
-p 32122:2122 -p 37077:7077 -p 36066:6066 -p 38088:8088 -p 38040:8040 \
-p 33306:3306 -p 39000:9000 -p 38042:8042 -p 38080:8080 -p 37017:27017 \
-p 39083:9083 -p 38998:8998 -p 39200:9200 bhlx3lyx7/griffin_demo:0.0.1
```
-4. Now you can visit UI through your browser, login with account "test" and password "test" if required.
+5. Now you can visit UI through your browser, login with account "test" and password "test" if required.
```
http://<your local IP address>:38080/
```
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
index fcfb34a..84fb1f3 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
@@ -1,6 +1,6 @@
package org.apache.griffin.measure.batch
-import org.apache.griffin.measure.batch.algo.{Algo, BatchAccuracyAlgo}
+import org.apache.griffin.measure.batch.algo._
import org.apache.griffin.measure.batch.config.params._
import org.apache.griffin.measure.batch.config.params.env._
import org.apache.griffin.measure.batch.config.params.user._
@@ -62,7 +62,8 @@ object Application extends Loggable {
// choose algorithm
val dqType = allParam.userParam.dqType
val algo: Algo = dqType match {
- case "accuracy" => BatchAccuracyAlgo(allParam)
+ case MeasureType.accuracy() => BatchAccuracyAlgo(allParam)
+ case MeasureType.profile() => BatchProfileAlgo(allParam)
case _ => {
error(s"${dqType} is unsupported dq type!")
sys.exit(-4)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
new file mode 100644
index 0000000..7ea8d12
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
@@ -0,0 +1,137 @@
+package org.apache.griffin.measure.batch.algo
+
+import java.util.Date
+
+import org.apache.griffin.measure.batch.algo.core.ProfileCore
+import org.apache.griffin.measure.batch.config.params._
+import org.apache.griffin.measure.batch.connector.{DataConnector, DataConnectorFactory}
+import org.apache.griffin.measure.batch.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.batch.result._
+import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
+import org.apache.griffin.measure.batch.rule.expr.{Expr, StatementExpr}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.util.{Failure, Success, Try}
+
+// profile algorithm for batch mode
+case class BatchProfileAlgo(allParam: AllParam) extends ProfileAlgo {
+ val envParam = allParam.envParam
+ val userParam = allParam.userParam
+
+ def run(): Try[_] = {
+ Try {
+ val metricName = userParam.name
+
+ val conf = new SparkConf().setAppName(metricName)
+ val sc = new SparkContext(conf)
+ val sqlContext = new HiveContext(sc)
+
+ // start time
+ val startTime = new Date().getTime()
+
+ // get persists to persist measure result
+ val persist: Persist = PersistFactory(envParam.persistParams, metricName).getPersists(startTime)
+
+ // get spark application id
+ val applicationId = sc.applicationId
+
+ // persist start id
+ persist.start(applicationId)
+
+ // generate rule from rule param, generate rule analyzer
+ val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+ val rule: StatementExpr = ruleFactory.generateRule()
+ val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+
+ // const expr value map
+ val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
+ val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
+
+ // data connector
+ val sourceDataConnector: DataConnector =
+ DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
+ ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
+ ) match {
+ case Success(cntr) => {
+ if (cntr.available) cntr
+ else throw new Exception("source data connection error!")
+ }
+ case Failure(ex) => throw ex
+ }
+
+ // get metadata
+ // val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
+ // case Success(md) => md
+ // case _ => throw new Exception("source metadata error!")
+ // }
+
+ // get data
+ val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
+ case Success(dt) => dt
+ case Failure(ex) => throw ex
+ }
+
+ // profile algorithm
+ val (profileResult, missingRdd, matchedRdd) = profile(sourceData, ruleAnalyzer)
+
+ // end time
+ val endTime = new Date().getTime
+ persist.log(endTime, s"calculation using time: ${endTime - startTime} ms")
+
+ // persist result
+ persist.result(endTime, profileResult)
+ val matchedRecords = matchedRdd.map(record2String(_, ruleAnalyzer.sourceRuleExprs.persistExprs))
+ persist.matchRecords(matchedRecords)
+
+ // persist end time
+ val persistEndTime = new Date().getTime
+ persist.log(persistEndTime, s"persist using time: ${persistEndTime - endTime} ms")
+
+ // finish
+ persist.finish()
+
+ // context stop
+ sc.stop
+ }
+ }
+
+ def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, Any]) = {
+ (data, Map[String, Any]())
+ }
+
+ // calculate profile from source data
+ def profile(sourceData: RDD[(Product, Map[String, Any])], ruleAnalyzer: RuleAnalyzer
+ ): (ProfileResult, RDD[(Product, (Map[String, Any], Map[String, Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = {
+
+ // 1. wrap data
+ val sourceWrappedData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceData.map(r => (r._1, wrapInitData(r._2)))
+
+ // 2. profile calculation
+ val (profileResult, missingRdd, matchedRdd) = ProfileCore.profile(sourceWrappedData, ruleAnalyzer)
+
+ (profileResult, missingRdd, matchedRdd)
+ }
+
+ // convert data into a string
+ def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr]): String = {
+ val (key, (data, info)) = rec
+ val persistData = getPersistMap(data, sourcePersist)
+ val persistInfo = info
+ if (persistInfo.size > 0) s"${persistData} [${persistInfo}]" else s"${persistData}"
+ }
+
+ // get the expr value map of the persist expressions
+ private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
+ val persistMap = persist.map(e => (e._id, e.desc)).toMap
+ data.flatMap { pair =>
+ val (k, v) = pair
+ persistMap.get(k) match {
+ case Some(d) => Some((d -> v))
+ case _ => None
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
new file mode 100644
index 0000000..4f6924c
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
@@ -0,0 +1,8 @@
+package org.apache.griffin.measure.batch.algo
+
+object MeasureType {
+
+ val accuracy = """^(?i)accuracy$""".r
+ val profile = """^(?i)profile""".r
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
new file mode 100644
index 0000000..fe64767
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
@@ -0,0 +1,5 @@
+package org.apache.griffin.measure.batch.algo
+
+trait ProfileAlgo extends Algo {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
new file mode 100644
index 0000000..c9fbcff
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
@@ -0,0 +1,53 @@
+package org.apache.griffin.measure.batch.algo.core
+
+import org.apache.griffin.measure.batch.rule.RuleAnalyzer
+import org.apache.griffin.measure.batch.result._
+import org.apache.spark.rdd.RDD
+
+
+object ProfileCore {
+
+ type V = Map[String, Any]
+ type T = Map[String, Any]
+
+ // allKvs: rdd of (key, (List[(sourceData, sourceInfo)], List[(targetData, targetInfo)]))
+ // output: accuracy result, missing source data rdd, matched source data rdd
+ def profile(dataRdd: RDD[(Product, (V, T))], ruleAnalyzer: RuleAnalyzer
+ ): (ProfileResult, RDD[(Product, (V, T))], RDD[(Product, (V, T))]) = {
+
+ val resultRdd: RDD[((Product, (V, T)), Boolean)] = dataRdd.map { kv =>
+ val (key, (data, info)) = kv
+ val (matched, missInfo) = matchData(data, ruleAnalyzer)
+ ((key, (data, info ++ missInfo)), matched)
+ }
+
+ val totalCount = resultRdd.count
+ val matchRdd = resultRdd.filter(_._2).map(_._1)
+ val matchCount = matchRdd.count
+ val missRdd = resultRdd.filter(!_._2).map(_._1)
+ val missCount = missRdd.count
+
+ (ProfileResult(matchCount, totalCount), missRdd, matchRdd)
+
+ }
+
+ // try to match data as rule, return true if matched, false if unmatched
+ private def matchData(data: V, ruleAnalyzer: RuleAnalyzer): (Boolean, T) = {
+
+ // 1. check valid
+ if (ruleAnalyzer.rule.valid(data)) {
+ // 2. substitute the cached data into statement, get the statement value
+ val matched = ruleAnalyzer.rule.calculate(data) match {
+ case Some(b: Boolean) => b
+ case _ => false
+ }
+ // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches
+ if (matched) (matched, Map[String, Any]())
+ else (matched, Map[String, Any](MismatchInfo.wrap("not matched")))
+ } else {
+ (false, Map[String, Any](MismatchInfo.wrap("invalid to compare")))
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
index de05eb3..03955d5 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
@@ -26,6 +26,7 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
val ResultFile = filePath("_RESULT")
val MissRecFile = filePath("_MISSREC") // optional
+ val MatchRecFile = filePath("_MATCHREC") // optional
val LogFile = filePath("_LOG")
@@ -83,6 +84,9 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
case ar: AccuracyResult => {
s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
}
+ case pr: ProfileResult => {
+ s"match percentage: ${pr.matchPercentage}\ntotal count: ${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
+ }
case _ => {
s"result: ${result}"
}
@@ -97,7 +101,7 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
}
// need to avoid string too long
- def missRecords(records: RDD[String]): Unit = {
+ private def rddRecords(records: RDD[String], path: String): Unit = {
try {
val recordCount = records.count
val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
@@ -105,7 +109,7 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
if (groupCount <= 1) {
val recs = records.take(count.toInt)
- persistRecords(MissRecFile, recs)
+ persistRecords(path, recs)
} else {
val groupedRecords: RDD[(Long, Iterable[String])] =
records.zipWithIndex.flatMap { r =>
@@ -114,7 +118,7 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
}.groupByKey()
groupedRecords.foreach { group =>
val (gid, recs) = group
- val hdfsPath = if (gid == 0) MissRecFile else withSuffix(MissRecFile, gid.toString)
+ val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString)
persistRecords(hdfsPath, recs)
}
}
@@ -124,6 +128,14 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
}
}
+ def missRecords(records: RDD[String]): Unit = {
+ rddRecords(records, MissRecFile)
+ }
+
+ def matchRecords(records: RDD[String]): Unit = {
+ rddRecords(records, MatchRecFile)
+ }
+
private def persistRecords(hdfsPath: String, records: Iterable[String]): Unit = {
val recStr = records.mkString("\n")
HdfsUtil.appendContent(hdfsPath, recStr)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
index bb0ea6f..fed4878 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
@@ -23,29 +23,37 @@ case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp:
def finish(): Unit = {}
def result(rt: Long, result: Result): Unit = {
- try {
- result match {
- case ar: AccuracyResult => {
- val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch))
- val data = JsonUtil.toJson(dataMap)
-
- // post
- val params = Map[String, Object]()
-// val header = Map[String, Object](("content-type" -> "application/json"))
- val header = Map[String, Object]()
- val status = HttpUtil.httpRequest(api, method, params, header, data)
- info(s"${method} to ${api} response status: ${status}")
- }
- case _ => {
- info(s"result: ${result}")
- }
+ result match {
+ case ar: AccuracyResult => {
+ val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch))
+ httpResult(dataMap)
+ }
+ case pr: ProfileResult => {
+ val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch))
+ httpResult(dataMap)
}
+ case _ => {
+ info(s"result: ${result}")
+ }
+ }
+ }
+
+ private def httpResult(dataMap: Map[String, Any]) = {
+ try {
+ val data = JsonUtil.toJson(dataMap)
+ // post
+ val params = Map[String, Object]()
+ val header = Map[String, Object]()
+ val status = HttpUtil.httpRequest(api, method, params, header, data)
+ info(s"${method} to ${api} response status: ${status}")
} catch {
case e: Throwable => error(e.getMessage)
}
+
}
def missRecords(records: RDD[String]): Unit = {}
+ def matchRecords(records: RDD[String]): Unit = {}
def log(rt: Long, msg: String): Unit = {}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
index 11973e9..1e5fbed 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
@@ -24,6 +24,7 @@ case class MultiPersists(persists: Iterable[Persist]) extends Persist {
def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, result)) }
def missRecords(records: RDD[String]): Unit = { persists.foreach(_.missRecords(records)) }
+ def matchRecords(records: RDD[String]): Unit = { persists.foreach(_.matchRecords(records)) }
def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) }
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
index 16a8edd..28dcd22 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
@@ -20,6 +20,7 @@ trait Persist extends Loggable with Serializable {
def result(rt: Long, result: Result): Unit
def missRecords(records: RDD[String]): Unit
+ def matchRecords(records: RDD[String]): Unit
def log(rt: Long, msg: String): Unit
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala
new file mode 100644
index 0000000..0b52bfb
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala
@@ -0,0 +1,26 @@
+package org.apache.griffin.measure.batch.result
+
+// result for profile: match count, total count
+case class ProfileResult(matchCount: Long, totalCount: Long) extends Result {
+
+ type T = ProfileResult
+
+ def update(delta: T): T = {
+ ProfileResult(matchCount + delta.matchCount, totalCount)
+ }
+
+ def eventual(): Boolean = {
+ this.matchCount >= totalCount
+ }
+
+ def differsFrom(other: T): Boolean = {
+ (this.matchCount != other.matchCount) || (this.totalCount != other.totalCount)
+ }
+
+ def getMiss = totalCount - matchCount
+ def getTotal = totalCount
+ def getMatch = matchCount
+
+ def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
index e4e7bbf..9f2e29d 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
@@ -13,6 +13,8 @@ object CalculationUtil {
def + (other: Option[_]): Option[_] = {
Try {
(value, other) match {
+ case (None, _) | (_, None) => None
+ case (Some(null), _) | (_, Some(null)) => None
case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString)
case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte)
case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort)
@@ -20,18 +22,19 @@ object CalculationUtil {
case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong)
case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat)
case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble)
- case (None, Some(v2)) => other
case _ => value
}
} match {
case Success(opt) => opt
- case _ => value
+ case _ => None
}
}
def - (other: Option[_]): Option[_] = {
Try {
(value, other) match {
+ case (None, _) | (_, None) => None
+ case (Some(null), _) | (_, Some(null)) => None
case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte)
case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort)
case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt)
@@ -42,13 +45,15 @@ object CalculationUtil {
}
} match {
case Success(opt) => opt
- case _ => value
+ case _ => None
}
}
def * (other: Option[_]): Option[_] = {
Try {
(value, other) match {
+ case (None, _) | (_, None) => None
+ case (Some(null), _) | (_, Some(null)) => None
case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2)
case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt)
case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte)
@@ -61,13 +66,15 @@ object CalculationUtil {
}
} match {
case Success(opt) => opt
- case _ => value
+ case _ => None
}
}
def / (other: Option[_]): Option[_] = {
Try {
(value, other) match {
+ case (None, _) | (_, None) => None
+ case (Some(null), _) | (_, Some(null)) => None
case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte)
case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort)
case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt)
@@ -78,13 +85,15 @@ object CalculationUtil {
}
} match {
case Success(opt) => opt
- case _ => value
+ case _ => None
}
}
def % (other: Option[_]): Option[_] = {
Try {
(value, other) match {
+ case (None, _) | (_, None) => None
+ case (Some(null), _) | (_, Some(null)) => None
case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte)
case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort)
case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt)
@@ -95,12 +104,14 @@ object CalculationUtil {
}
} match {
case Success(opt) => opt
- case _ => value
+ case _ => None
}
}
def unary_- (): Option[_] = {
value match {
+ case None => None
+ case Some(null) => None
case Some(v: String) => Some(v.reverse.toString)
case Some(v: Boolean) => Some(!v)
case Some(v: Byte) => Some(-v)
@@ -117,16 +128,16 @@ object CalculationUtil {
def === (other: Option[_]): Option[Boolean] = {
(value, other) match {
- case (Some(v1), Some(v2)) => Some(v1 == v2)
case (None, None) => Some(true)
+ case (Some(v1), Some(v2)) => Some(v1 == v2)
case _ => Some(false)
}
}
def =!= (other: Option[_]): Option[Boolean] = {
(value, other) match {
- case (Some(v1), Some(v2)) => Some(v1 != v2)
case (None, None) => Some(false)
+ case (Some(v1), Some(v2)) => Some(v1 != v2)
case _ => Some(true)
}
}
@@ -134,6 +145,8 @@ object CalculationUtil {
def > (other: Option[_]): Option[Boolean] = {
Try {
(value, other) match {
+ case (None, _) | (_, None) => None
+ case (Some(null), _) | (_, Some(null)) => None
case (Some(v1: String), Some(v2: String)) => Some(v1 > v2)
case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble)
case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble)
@@ -152,7 +165,9 @@ object CalculationUtil {
def >= (other: Option[_]): Option[Boolean] = {
Try {
(value, other) match {
- case (None, None) => Some(true)
+ case (None, None) | (Some(null), Some(null)) => Some(true)
+ case (None, _) | (_, None) => None
+ case (Some(null), _) | (_, Some(null)) => None
case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2)
case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble)
case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble)
@@ -171,6 +186,8 @@ object CalculationUtil {
def < (other: Option[_]): Option[Boolean] = {
Try {
(value, other) match {
+ case (None, _) | (_, None) => None
+ case (Some(null), _) | (_, Some(null)) => None
case (Some(v1: String), Some(v2: String)) => Some(v1 < v2)
case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble)
case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble)
@@ -189,7 +206,9 @@ object CalculationUtil {
def <= (other: Option[_]): Option[Boolean] = {
Try {
(value, other) match {
- case (None, None) => Some(true)
+ case (None, None) | (Some(null), Some(null)) => Some(true)
+ case (None, _) | (_, None) => None
+ case (Some(null), _) | (_, Some(null)) => None
case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2)
case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble)
case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble)
@@ -249,12 +268,16 @@ object CalculationUtil {
private def optNot(a: Option[_]): Option[Boolean] = {
a match {
+ case None => None
+ case Some(null) => None
case Some(v: Boolean) => Some(!v)
case _ => None
}
}
private def optAnd(a: Option[_], b: Option[_]): Option[Boolean] = {
(a, b) match {
+ case (None, _) | (_, None) => None
+ case (Some(null), _) | (_, Some(null)) => None
case (Some(false), _) | (_, Some(false)) => Some(false)
case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2)
case _ => None
@@ -262,6 +285,8 @@ object CalculationUtil {
}
private def optOr(a: Option[_], b: Option[_]): Option[Boolean] = {
(a, b) match {
+ case (None, _) | (_, None) => None
+ case (Some(null), _) | (_, Some(null)) => None
case (Some(true), _) | (_, Some(true)) => Some(true)
case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2)
case _ => None
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
index 47bf38e..49094e8 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
@@ -120,12 +120,13 @@ case class RuleParser() extends JavaTokenParsers with Serializable {
import SomeNumber._
// -- literal --
- def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean | literialNull
+ def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean | literialNull | literialNone
def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) }
def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) }
def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { LiteralTimeExpr(_) }
def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | """(?i)false""".r) ^^ { LiteralBooleanExpr(_) }
- def literialNull: Parser[LiteralNullExpr] = ("""(?i)null""".r | """(?i)undefined""".r | """(?i)none""".r) ^^ { LiteralNullExpr(_) }
+ def literialNull: Parser[LiteralNullExpr] = ("""(?i)null""".r | """(?i)undefined""".r) ^^ { LiteralNullExpr(_) }
+ def literialNone: Parser[LiteralNoneExpr] = """(?i)none""".r ^^ { LiteralNoneExpr(_) }
// -- selection --
// <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
index bbfac8b..8bf0dd6 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
@@ -68,6 +68,11 @@ case class LiteralBooleanExpr(expr: String) extends LiteralExpr {
}
case class LiteralNullExpr(expr: String) extends LiteralExpr {
- val value: Option[Any] = None
+ val value: Option[Any] = Some(null)
val desc: String = "null"
+}
+
+case class LiteralNoneExpr(expr: String) extends LiteralExpr {
+ val value: Option[Any] = None
+ val desc: String = "none"
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/test/resources/config-profile.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/config-profile.json b/measure/measure-batch/src/test/resources/config-profile.json
new file mode 100644
index 0000000..6b82d7f
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/config-profile.json
@@ -0,0 +1,17 @@
+{
+ "name": "prof1",
+ "type": "profile",
+
+ "source": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
+ }
+ },
+
+ "evaluateRule": {
+ "sampleRatio": 1,
+ "rules": "$source.post_code == null"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
new file mode 100644
index 0000000..a1645f3
--- /dev/null
+++ b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
@@ -0,0 +1,149 @@
+//package org.apache.griffin.measure.batch.algo
+//
+//import java.util.Date
+//
+//import org.apache.griffin.measure.batch.config.params._
+//import org.apache.griffin.measure.batch.config.params.env._
+//import org.apache.griffin.measure.batch.config.params.user._
+//import org.apache.griffin.measure.batch.config.reader._
+//import org.apache.griffin.measure.batch.config.validator._
+//import org.apache.griffin.measure.batch.connector.{DataConnector, DataConnectorFactory}
+//import org.apache.griffin.measure.batch.log.Loggable
+//import org.apache.griffin.measure.batch.rule.expr._
+//import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
+//import org.apache.spark.rdd.RDD
+//import org.apache.spark.sql.SQLContext
+//import org.apache.spark.{SparkConf, SparkContext}
+//import org.junit.runner.RunWith
+//import org.scalatest.junit.JUnitRunner
+//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+//
+//import scala.util.{Failure, Success, Try}
+//
+//
+//@RunWith(classOf[JUnitRunner])
+//class BatchProfileAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+//
+// val envFile = "src/test/resources/env.json"
+// val confFile = "src/test/resources/config-profile.json"
+// val envFsType = "local"
+// val userFsType = "local"
+//
+// val args = Array(envFile, confFile)
+//
+// var sc: SparkContext = _
+// var sqlContext: SQLContext = _
+//
+// var allParam: AllParam = _
+//
+// before {
+// // read param files
+// val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+// case Success(p) => p
+// case Failure(ex) => {
+// error(ex.getMessage)
+// sys.exit(-2)
+// }
+// }
+// val userParam = readParamFile[UserParam](confFile, userFsType) match {
+// case Success(p) => p
+// case Failure(ex) => {
+// error(ex.getMessage)
+// sys.exit(-2)
+// }
+// }
+// allParam = AllParam(envParam, userParam)
+//
+// // validate param files
+// validateParams(allParam) match {
+// case Failure(ex) => {
+// error(ex.getMessage)
+// sys.exit(-3)
+// }
+// case _ => {
+// info("params validation pass")
+// }
+// }
+//
+// val metricName = userParam.name
+// val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
+// sc = new SparkContext(conf)
+// sqlContext = new SQLContext(sc)
+// }
+//
+// test("algorithm") {
+// Try {
+// val envParam = allParam.envParam
+// val userParam = allParam.userParam
+//
+// // start time
+// val startTime = new Date().getTime()
+//
+// // get spark application id
+// val applicationId = sc.applicationId
+//
+// // rules
+// val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+// val rule: StatementExpr = ruleFactory.generateRule()
+// val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+//
+// ruleAnalyzer.constCacheExprs.foreach(println)
+// ruleAnalyzer.constFinalCacheExprs.foreach(println)
+//
+// // global cache data
+// val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
+// val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
+//
+// // data connector
+// val sourceDataConnector: DataConnector =
+// DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
+// ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
+// ) match {
+// case Success(cntr) => {
+// if (cntr.available) cntr
+// else throw new Exception("source data not available!")
+// }
+// case Failure(ex) => throw ex
+// }
+//
+// // get data
+// val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
+// case Success(dt) => dt
+// case Failure(ex) => throw ex
+// }
+//
+// // my algo
+// val algo = BatchProfileAlgo(allParam)
+//
+// // profile algorithm
+// val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, ruleAnalyzer)
+//
+// println(s"match percentage: ${profileResult.matchPercentage}, match count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}")
+//
+// matchedRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println)
+//
+// // end time
+// val endTime = new Date().getTime
+// println(s"using time: ${endTime - startTime} ms")
+// } match {
+// case Failure(ex) => {
+// error(ex.getMessage)
+// sys.exit(-4)
+// }
+// case _ => {
+// info("calculation finished")
+// }
+// }
+// }
+//
+// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
+// val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+// paramReader.readConfig[T]
+// }
+//
+// private def validateParams(allParam: AllParam): Try[Boolean] = {
+// val allParamValidator = AllParamValidator()
+// allParamValidator.validate(allParam)
+// }
+//
+//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/service/src/main/resources/application-dev.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/application-dev.properties b/service/src/main/resources/application-dev.properties
new file mode 100644
index 0000000..12f8865
--- /dev/null
+++ b/service/src/main/resources/application-dev.properties
@@ -0,0 +1,22 @@
+spring.datasource.url= jdbc:mysql://localhost:3306/quartz?autoReconnect=true&useSSL=false
+spring.datasource.username =griffin
+spring.datasource.password =123456
+
+spring.datasource.driver-class-name=com.mysql.jdbc.Driver
+
+## Hibernate ddl auto (validate,create, create-drop, update)
+
+spring.jpa.hibernate.ddl-auto = create-drop
+spring.jpa.show-sql=true
+spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
+#
+#
+## Naming strategy
+spring.jpa.hibernate.naming-strategy = org.hibernate.cfg.ImprovedNamingStrategy
+
+# hive metastore
+hive.metastore.uris = thrift://10.9.246.187:9083
+hive.metastore.dbname = default
+
+# kafka schema registry
+kafka.schema.registry.url = http://10.65.159.119:8081
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/service/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties
index 12f8865..685ca8a 100644
--- a/service/src/main/resources/application.properties
+++ b/service/src/main/resources/application.properties
@@ -15,8 +15,8 @@ spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
spring.jpa.hibernate.naming-strategy = org.hibernate.cfg.ImprovedNamingStrategy
# hive metastore
-hive.metastore.uris = thrift://10.9.246.187:9083
+hive.metastore.uris = thrift://localhost:9083
hive.metastore.dbname = default
# kafka schema registry
-kafka.schema.registry.url = http://10.65.159.119:8081
+kafka.schema.registry.url = http://localhost:8081