You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/06/02 06:10:42 UTC

incubator-griffin git commit: [GRIFFIN-26] support profiling measurement

Repository: incubator-griffin
Updated Branches:
  refs/heads/master ce9e5d399 -> 1cca069dd


[GRIFFIN-26] support profiling measurement

Author: Liu <ll...@ebay.com>
Author: Liu <ll...@lm-shc-16501428.corp.ebay.com>
Author: Lionel Liu <bh...@163.com>
Author: Liu <ll...@lm-shc-16501428.dhcp>
Author: bhlx3lyx7 <bh...@163.com>

Closes #48 from bhlx3lyx7/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/1cca069d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/1cca069d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/1cca069d

Branch: refs/heads/master
Commit: 1cca069ddf1b3d889f2ca3ccf4c686e04413f608
Parents: ce9e5d3
Author: Liu <ll...@ebay.com>
Authored: Fri Jun 2 14:10:29 2017 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Fri Jun 2 14:10:29 2017 +0800

----------------------------------------------------------------------
 README.md                                       |   8 +-
 .../griffin/measure/batch/Application.scala     |   5 +-
 .../measure/batch/algo/BatchProfileAlgo.scala   | 137 +++++++++++++++++
 .../measure/batch/algo/MeasureType.scala        |   8 +
 .../measure/batch/algo/ProfileAlgo.scala        |   5 +
 .../measure/batch/algo/core/ProfileCore.scala   |  53 +++++++
 .../measure/batch/persist/HdfsPersist.scala     |  18 ++-
 .../measure/batch/persist/HttpPersist.scala     |  40 +++--
 .../measure/batch/persist/MultiPersists.scala   |   1 +
 .../griffin/measure/batch/persist/Persist.scala |   1 +
 .../measure/batch/result/ProfileResult.scala    |  26 ++++
 .../measure/batch/rule/CalculationUtil.scala    |  45 ++++--
 .../griffin/measure/batch/rule/RuleParser.scala |   5 +-
 .../measure/batch/rule/expr/LiteralExpr.scala   |   7 +-
 .../src/test/resources/config-profile.json      |  17 +++
 .../batch/algo/BatchProfileAlgoTest.scala       | 149 +++++++++++++++++++
 .../main/resources/application-dev.properties   |  22 +++
 .../src/main/resources/application.properties   |   4 +-
 18 files changed, 513 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 1724e18..dceefda 100644
--- a/README.md
+++ b/README.md
@@ -21,14 +21,18 @@ Release:
     ```
     docker pull bhlx3lyx7/griffin_demo:0.0.1
     ```
-3. Run this docker image, then griffin is ready.
+3. Increase vm.max_map_count of your local machine, to use elasticsearch.  
+    ```
+    sysctl -w vm.max_map_count=262144
+    ```  
+4. Run this docker image, then griffin is ready.
     ```
     docker run -it -h sandbox --name griffin_demo -m 8G --memory-swap -1 \
     -p 32122:2122 -p 37077:7077 -p 36066:6066 -p 38088:8088 -p 38040:8040 \
     -p 33306:3306 -p 39000:9000 -p 38042:8042 -p 38080:8080 -p 37017:27017 \
     -p 39083:9083 -p 38998:8998 -p 39200:9200 bhlx3lyx7/griffin_demo:0.0.1
     ```
-4. Now you can visit UI through your browser, login with account "test" and password "test" if required.
+5. Now you can visit UI through your browser, login with account "test" and password "test" if required.
     ```
     http://<your local IP address>:38080/
     ```

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
index fcfb34a..84fb1f3 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
@@ -1,6 +1,6 @@
 package org.apache.griffin.measure.batch
 
-import org.apache.griffin.measure.batch.algo.{Algo, BatchAccuracyAlgo}
+import org.apache.griffin.measure.batch.algo._
 import org.apache.griffin.measure.batch.config.params._
 import org.apache.griffin.measure.batch.config.params.env._
 import org.apache.griffin.measure.batch.config.params.user._
@@ -62,7 +62,8 @@ object Application extends Loggable {
     // choose algorithm
     val dqType = allParam.userParam.dqType
     val algo: Algo = dqType match {
-      case "accuracy" => BatchAccuracyAlgo(allParam)
+      case MeasureType.accuracy() => BatchAccuracyAlgo(allParam)
+      case MeasureType.profile() => BatchProfileAlgo(allParam)
       case _ => {
         error(s"${dqType} is unsupported dq type!")
         sys.exit(-4)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
new file mode 100644
index 0000000..7ea8d12
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
@@ -0,0 +1,137 @@
+package org.apache.griffin.measure.batch.algo
+
+import java.util.Date
+
+import org.apache.griffin.measure.batch.algo.core.ProfileCore
+import org.apache.griffin.measure.batch.config.params._
+import org.apache.griffin.measure.batch.connector.{DataConnector, DataConnectorFactory}
+import org.apache.griffin.measure.batch.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.batch.result._
+import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
+import org.apache.griffin.measure.batch.rule.expr.{Expr, StatementExpr}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.util.{Failure, Success, Try}
+
+// profile algorithm for batch mode
+case class BatchProfileAlgo(allParam: AllParam) extends ProfileAlgo {
+  val envParam = allParam.envParam
+  val userParam = allParam.userParam
+
+  def run(): Try[_] = {
+    Try {
+      val metricName = userParam.name
+
+      val conf = new SparkConf().setAppName(metricName)
+      val sc = new SparkContext(conf)
+      val sqlContext = new HiveContext(sc)
+
+      // start time
+      val startTime = new Date().getTime()
+
+      // get persists to persist measure result
+      val persist: Persist = PersistFactory(envParam.persistParams, metricName).getPersists(startTime)
+
+      // get spark application id
+      val applicationId = sc.applicationId
+
+      // persist start id
+      persist.start(applicationId)
+
+      // generate rule from rule param, generate rule analyzer
+      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+      val rule: StatementExpr = ruleFactory.generateRule()
+      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+
+      // const expr value map
+      val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
+      val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
+
+      // data connector
+      val sourceDataConnector: DataConnector =
+      DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
+        ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
+      ) match {
+        case Success(cntr) => {
+          if (cntr.available) cntr
+          else throw new Exception("source data connection error!")
+        }
+        case Failure(ex) => throw ex
+      }
+
+      // get metadata
+      //      val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
+      //        case Success(md) => md
+      //        case _ => throw new Exception("source metadata error!")
+      //      }
+
+      // get data
+      val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
+        case Success(dt) => dt
+        case Failure(ex) => throw ex
+      }
+
+      // profile algorithm
+      val (profileResult, missingRdd, matchedRdd) = profile(sourceData, ruleAnalyzer)
+
+      // end time
+      val endTime = new Date().getTime
+      persist.log(endTime, s"calculation using time: ${endTime - startTime} ms")
+
+      // persist result
+      persist.result(endTime, profileResult)
+      val matchedRecords = matchedRdd.map(record2String(_, ruleAnalyzer.sourceRuleExprs.persistExprs))
+      persist.matchRecords(matchedRecords)
+
+      // persist end time
+      val persistEndTime = new Date().getTime
+      persist.log(persistEndTime, s"persist using time: ${persistEndTime - endTime} ms")
+
+      // finish
+      persist.finish()
+
+      // context stop
+      sc.stop
+    }
+  }
+
+  def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, Any]) = {
+    (data, Map[String, Any]())
+  }
+
+  // calculate profile from source data
+  def profile(sourceData: RDD[(Product, Map[String, Any])], ruleAnalyzer: RuleAnalyzer
+              ): (ProfileResult, RDD[(Product, (Map[String, Any], Map[String, Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = {
+
+    // 1. wrap data
+    val sourceWrappedData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceData.map(r => (r._1, wrapInitData(r._2)))
+
+    // 2. profile calculation
+    val (profileResult, missingRdd, matchedRdd) = ProfileCore.profile(sourceWrappedData, ruleAnalyzer)
+
+    (profileResult, missingRdd, matchedRdd)
+  }
+
+  // convert data into a string
+  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr]): String = {
+    val (key, (data, info)) = rec
+    val persistData = getPersistMap(data, sourcePersist)
+    val persistInfo = info
+    if (persistInfo.size > 0) s"${persistData} [${persistInfo}]" else s"${persistData}"
+  }
+
+  // get the expr value map of the persist expressions
+  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
+    val persistMap = persist.map(e => (e._id, e.desc)).toMap
+    data.flatMap { pair =>
+      val (k, v) = pair
+      persistMap.get(k) match {
+        case Some(d) => Some((d -> v))
+        case _ => None
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
new file mode 100644
index 0000000..4f6924c
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
@@ -0,0 +1,8 @@
+package org.apache.griffin.measure.batch.algo
+
+object MeasureType {
+
+  val accuracy = """^(?i)accuracy$""".r
+  val profile = """^(?i)profile""".r
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
new file mode 100644
index 0000000..fe64767
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
@@ -0,0 +1,5 @@
+package org.apache.griffin.measure.batch.algo
+
+trait ProfileAlgo extends Algo {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
new file mode 100644
index 0000000..c9fbcff
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
@@ -0,0 +1,53 @@
+package org.apache.griffin.measure.batch.algo.core
+
+import org.apache.griffin.measure.batch.rule.RuleAnalyzer
+import org.apache.griffin.measure.batch.result._
+import org.apache.spark.rdd.RDD
+
+
+object ProfileCore {
+
+  type V = Map[String, Any]
+  type T = Map[String, Any]
+
+  // allKvs: rdd of (key, (List[(sourceData, sourceInfo)], List[(targetData, targetInfo)]))
+  // output: accuracy result, missing source data rdd, matched source data rdd
+  def profile(dataRdd: RDD[(Product, (V, T))], ruleAnalyzer: RuleAnalyzer
+              ): (ProfileResult, RDD[(Product, (V, T))], RDD[(Product, (V, T))]) = {
+
+    val resultRdd: RDD[((Product, (V, T)), Boolean)] = dataRdd.map { kv =>
+      val (key, (data, info)) = kv
+      val (matched, missInfo) = matchData(data, ruleAnalyzer)
+      ((key, (data, info ++ missInfo)), matched)
+    }
+
+    val totalCount = resultRdd.count
+    val matchRdd = resultRdd.filter(_._2).map(_._1)
+    val matchCount = matchRdd.count
+    val missRdd = resultRdd.filter(!_._2).map(_._1)
+    val missCount = missRdd.count
+
+    (ProfileResult(matchCount, totalCount), missRdd, matchRdd)
+
+  }
+
+  // try to match data as rule, return true if matched, false if unmatched
+  private def matchData(data: V, ruleAnalyzer: RuleAnalyzer): (Boolean, T) = {
+
+    // 1. check valid
+    if (ruleAnalyzer.rule.valid(data)) {
+      // 2. substitute the cached data into statement, get the statement value
+      val matched = ruleAnalyzer.rule.calculate(data) match {
+        case Some(b: Boolean) => b
+        case _ => false
+      }
+      // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches
+      if (matched) (matched, Map[String, Any]())
+      else (matched, Map[String, Any](MismatchInfo.wrap("not matched")))
+    } else {
+      (false, Map[String, Any](MismatchInfo.wrap("invalid to compare")))
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
index de05eb3..03955d5 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
@@ -26,6 +26,7 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
   val ResultFile = filePath("_RESULT")
 
   val MissRecFile = filePath("_MISSREC")      // optional
+  val MatchRecFile = filePath("_MATCHREC")    // optional
 
   val LogFile = filePath("_LOG")
 
@@ -83,6 +84,9 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
         case ar: AccuracyResult => {
           s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
         }
+        case pr: ProfileResult => {
+          s"match percentage: ${pr.matchPercentage}\ntotal count: ${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
+        }
         case _ => {
           s"result: ${result}"
         }
@@ -97,7 +101,7 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
   }
 
   // need to avoid string too long
-  def missRecords(records: RDD[String]): Unit = {
+  private def rddRecords(records: RDD[String], path: String): Unit = {
     try {
       val recordCount = records.count
       val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
@@ -105,7 +109,7 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
         val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
         if (groupCount <= 1) {
           val recs = records.take(count.toInt)
-          persistRecords(MissRecFile, recs)
+          persistRecords(path, recs)
         } else {
           val groupedRecords: RDD[(Long, Iterable[String])] =
             records.zipWithIndex.flatMap { r =>
@@ -114,7 +118,7 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
             }.groupByKey()
           groupedRecords.foreach { group =>
             val (gid, recs) = group
-            val hdfsPath = if (gid == 0) MissRecFile else withSuffix(MissRecFile, gid.toString)
+            val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString)
             persistRecords(hdfsPath, recs)
           }
         }
@@ -124,6 +128,14 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
     }
   }
 
+  def missRecords(records: RDD[String]): Unit = {
+    rddRecords(records, MissRecFile)
+  }
+
+  def matchRecords(records: RDD[String]): Unit = {
+    rddRecords(records, MatchRecFile)
+  }
+
   private def persistRecords(hdfsPath: String, records: Iterable[String]): Unit = {
     val recStr = records.mkString("\n")
     HdfsUtil.appendContent(hdfsPath, recStr)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
index bb0ea6f..fed4878 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
@@ -23,29 +23,37 @@ case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp:
   def finish(): Unit = {}
 
   def result(rt: Long, result: Result): Unit = {
-    try {
-      result match {
-        case ar: AccuracyResult => {
-          val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch))
-          val data = JsonUtil.toJson(dataMap)
-
-          // post
-          val params = Map[String, Object]()
-//          val header = Map[String, Object](("content-type" -> "application/json"))
-          val header = Map[String, Object]()
-          val status = HttpUtil.httpRequest(api, method, params, header, data)
-          info(s"${method} to ${api} response status: ${status}")
-        }
-        case _ => {
-          info(s"result: ${result}")
-        }
+    result match {
+      case ar: AccuracyResult => {
+        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch))
+        httpResult(dataMap)
+      }
+      case pr: ProfileResult => {
+        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch))
+        httpResult(dataMap)
       }
+      case _ => {
+        info(s"result: ${result}")
+      }
+    }
+  }
+
+  private def httpResult(dataMap: Map[String, Any]) = {
+    try {
+      val data = JsonUtil.toJson(dataMap)
+      // post
+      val params = Map[String, Object]()
+      val header = Map[String, Object]()
+      val status = HttpUtil.httpRequest(api, method, params, header, data)
+      info(s"${method} to ${api} response status: ${status}")
     } catch {
       case e: Throwable => error(e.getMessage)
     }
+
   }
 
   def missRecords(records: RDD[String]): Unit = {}
+  def matchRecords(records: RDD[String]): Unit = {}
 
   def log(rt: Long, msg: String): Unit = {}
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
index 11973e9..1e5fbed 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
@@ -24,6 +24,7 @@ case class MultiPersists(persists: Iterable[Persist]) extends Persist {
   def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, result)) }
 
   def missRecords(records: RDD[String]): Unit = { persists.foreach(_.missRecords(records)) }
+  def matchRecords(records: RDD[String]): Unit = { persists.foreach(_.matchRecords(records)) }
 
   def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
index 16a8edd..28dcd22 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
@@ -20,6 +20,7 @@ trait Persist extends Loggable with Serializable {
   def result(rt: Long, result: Result): Unit
 
   def missRecords(records: RDD[String]): Unit
+  def matchRecords(records: RDD[String]): Unit
 
   def log(rt: Long, msg: String): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala
new file mode 100644
index 0000000..0b52bfb
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala
@@ -0,0 +1,26 @@
+package org.apache.griffin.measure.batch.result
+
+// result for profile: match count, total count
+case class ProfileResult(matchCount: Long, totalCount: Long) extends Result {
+
+  type T = ProfileResult
+
+  def update(delta: T): T = {
+    ProfileResult(matchCount + delta.matchCount, totalCount)
+  }
+
+  def eventual(): Boolean = {
+    this.matchCount >= totalCount
+  }
+
+  def differsFrom(other: T): Boolean = {
+    (this.matchCount != other.matchCount) || (this.totalCount != other.totalCount)
+  }
+
+  def getMiss = totalCount - matchCount
+  def getTotal = totalCount
+  def getMatch = matchCount
+
+  def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
index e4e7bbf..9f2e29d 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
@@ -13,6 +13,8 @@ object CalculationUtil {
     def + (other: Option[_]): Option[_] = {
       Try {
         (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
           case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString)
           case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte)
           case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort)
@@ -20,18 +22,19 @@ object CalculationUtil {
           case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong)
           case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat)
           case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble)
-          case (None, Some(v2)) => other
           case _ => value
         }
       } match {
         case Success(opt) => opt
-        case _ => value
+        case _ => None
       }
     }
 
     def - (other: Option[_]): Option[_] = {
       Try {
         (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
           case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte)
           case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort)
           case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt)
@@ -42,13 +45,15 @@ object CalculationUtil {
         }
       } match {
         case Success(opt) => opt
-        case _ => value
+        case _ => None
       }
     }
 
     def * (other: Option[_]): Option[_] = {
       Try {
         (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
           case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2)
           case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt)
           case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte)
@@ -61,13 +66,15 @@ object CalculationUtil {
         }
       } match {
         case Success(opt) => opt
-        case _ => value
+        case _ => None
       }
     }
 
     def / (other: Option[_]): Option[_] = {
       Try {
         (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
           case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte)
           case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort)
           case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt)
@@ -78,13 +85,15 @@ object CalculationUtil {
         }
       } match {
         case Success(opt) => opt
-        case _ => value
+        case _ => None
       }
     }
 
     def % (other: Option[_]): Option[_] = {
       Try {
         (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
           case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte)
           case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort)
           case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt)
@@ -95,12 +104,14 @@ object CalculationUtil {
         }
       } match {
         case Success(opt) => opt
-        case _ => value
+        case _ => None
       }
     }
 
     def unary_- (): Option[_] = {
       value match {
+        case None => None
+        case Some(null) => None
         case Some(v: String) => Some(v.reverse.toString)
         case Some(v: Boolean) => Some(!v)
         case Some(v: Byte) => Some(-v)
@@ -117,16 +128,16 @@ object CalculationUtil {
 
     def === (other: Option[_]): Option[Boolean] = {
       (value, other) match {
-        case (Some(v1), Some(v2)) => Some(v1 == v2)
         case (None, None) => Some(true)
+        case (Some(v1), Some(v2)) => Some(v1 == v2)
         case _ => Some(false)
       }
     }
 
     def =!= (other: Option[_]): Option[Boolean] = {
       (value, other) match {
-        case (Some(v1), Some(v2)) => Some(v1 != v2)
         case (None, None) => Some(false)
+        case (Some(v1), Some(v2)) => Some(v1 != v2)
         case _ => Some(true)
       }
     }
@@ -134,6 +145,8 @@ object CalculationUtil {
     def > (other: Option[_]): Option[Boolean] = {
       Try {
         (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
           case (Some(v1: String), Some(v2: String)) => Some(v1 > v2)
           case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble)
           case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble)
@@ -152,7 +165,9 @@ object CalculationUtil {
     def >= (other: Option[_]): Option[Boolean] = {
       Try {
         (value, other) match {
-          case (None, None) => Some(true)
+          case (None, None) | (Some(null), Some(null)) => Some(true)
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
           case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2)
           case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble)
           case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble)
@@ -171,6 +186,8 @@ object CalculationUtil {
     def < (other: Option[_]): Option[Boolean] = {
       Try {
         (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
           case (Some(v1: String), Some(v2: String)) => Some(v1 < v2)
           case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble)
           case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble)
@@ -189,7 +206,9 @@ object CalculationUtil {
     def <= (other: Option[_]): Option[Boolean] = {
       Try {
         (value, other) match {
-          case (None, None) => Some(true)
+          case (None, None) | (Some(null), Some(null)) => Some(true)
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
           case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2)
           case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble)
           case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble)
@@ -249,12 +268,16 @@ object CalculationUtil {
 
     private def optNot(a: Option[_]): Option[Boolean] = {
       a match {
+        case None => None
+        case Some(null) => None
         case Some(v: Boolean) => Some(!v)
         case _ => None
       }
     }
     private def optAnd(a: Option[_], b: Option[_]): Option[Boolean] = {
       (a, b) match {
+        case (None, _) | (_, None) => None
+        case (Some(null), _) | (_, Some(null)) => None
         case (Some(false), _) | (_, Some(false)) => Some(false)
         case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2)
         case _ => None
@@ -262,6 +285,8 @@ object CalculationUtil {
     }
     private def optOr(a: Option[_], b: Option[_]): Option[Boolean] = {
       (a, b) match {
+        case (None, _) | (_, None) => None
+        case (Some(null), _) | (_, Some(null)) => None
         case (Some(true), _) | (_, Some(true)) => Some(true)
         case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2)
         case _ => None

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
index 47bf38e..49094e8 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
@@ -120,12 +120,13 @@ case class RuleParser() extends JavaTokenParsers with Serializable {
   import SomeNumber._
 
   // -- literal --
-  def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean | literialNull
+  def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean | literialNull | literialNone
   def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) }
   def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) }
   def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { LiteralTimeExpr(_) }
   def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | """(?i)false""".r) ^^ { LiteralBooleanExpr(_) }
-  def literialNull: Parser[LiteralNullExpr] = ("""(?i)null""".r | """(?i)undefined""".r | """(?i)none""".r) ^^ { LiteralNullExpr(_) }
+  def literialNull: Parser[LiteralNullExpr] = ("""(?i)null""".r | """(?i)undefined""".r) ^^ { LiteralNullExpr(_) }
+  def literialNone: Parser[LiteralNoneExpr] = """(?i)none""".r ^^ { LiteralNoneExpr(_) }
 
   // -- selection --
   // <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
index bbfac8b..8bf0dd6 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
@@ -68,6 +68,11 @@ case class LiteralBooleanExpr(expr: String) extends LiteralExpr {
 }
 
 case class LiteralNullExpr(expr: String) extends LiteralExpr {
-  val value: Option[Any] = None
+  val value: Option[Any] = Some(null)
   val desc: String = "null"
+}
+
+case class LiteralNoneExpr(expr: String) extends LiteralExpr {
+  val value: Option[Any] = None
+  val desc: String = "none"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/test/resources/config-profile.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/config-profile.json b/measure/measure-batch/src/test/resources/config-profile.json
new file mode 100644
index 0000000..6b82d7f
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/config-profile.json
@@ -0,0 +1,17 @@
+{
+  "name": "prof1",
+  "type": "profile",
+
+  "source": {
+    "type": "avro",
+    "version": "1.7",
+    "config": {
+      "file.name": "src/test/resources/users_info_src.avro"
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 1,
+    "rules": "$source.post_code == null"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
new file mode 100644
index 0000000..a1645f3
--- /dev/null
+++ b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
@@ -0,0 +1,149 @@
+//package org.apache.griffin.measure.batch.algo
+//
+//import java.util.Date
+//
+//import org.apache.griffin.measure.batch.config.params._
+//import org.apache.griffin.measure.batch.config.params.env._
+//import org.apache.griffin.measure.batch.config.params.user._
+//import org.apache.griffin.measure.batch.config.reader._
+//import org.apache.griffin.measure.batch.config.validator._
+//import org.apache.griffin.measure.batch.connector.{DataConnector, DataConnectorFactory}
+//import org.apache.griffin.measure.batch.log.Loggable
+//import org.apache.griffin.measure.batch.rule.expr._
+//import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
+//import org.apache.spark.rdd.RDD
+//import org.apache.spark.sql.SQLContext
+//import org.apache.spark.{SparkConf, SparkContext}
+//import org.junit.runner.RunWith
+//import org.scalatest.junit.JUnitRunner
+//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+//
+//import scala.util.{Failure, Success, Try}
+//
+//
+//@RunWith(classOf[JUnitRunner])
+//class BatchProfileAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+//
+//  val envFile = "src/test/resources/env.json"
+//  val confFile = "src/test/resources/config-profile.json"
+//  val envFsType = "local"
+//  val userFsType = "local"
+//
+//  val args = Array(envFile, confFile)
+//
+//  var sc: SparkContext = _
+//  var sqlContext: SQLContext = _
+//
+//  var allParam: AllParam = _
+//
+//  before {
+//    // read param files
+//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    allParam = AllParam(envParam, userParam)
+//
+//    // validate param files
+//    validateParams(allParam) match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-3)
+//      }
+//      case _ => {
+//        info("params validation pass")
+//      }
+//    }
+//
+//    val metricName = userParam.name
+//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
+//    sc = new SparkContext(conf)
+//    sqlContext = new SQLContext(sc)
+//  }
+//
+//  test("algorithm") {
+//    Try {
+//      val envParam = allParam.envParam
+//      val userParam = allParam.userParam
+//
+//      // start time
+//      val startTime = new Date().getTime()
+//
+//      // get spark application id
+//      val applicationId = sc.applicationId
+//
+//      // rules
+//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+//      val rule: StatementExpr = ruleFactory.generateRule()
+//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+//
+//      ruleAnalyzer.constCacheExprs.foreach(println)
+//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
+//
+//      // global cache data
+//      val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
+//      val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
+//
+//      // data connector
+//      val sourceDataConnector: DataConnector =
+//        DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
+//          ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
+//        ) match {
+//          case Success(cntr) => {
+//            if (cntr.available) cntr
+//            else throw new Exception("source data not available!")
+//          }
+//          case Failure(ex) => throw ex
+//        }
+//
+//      // get data
+//      val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
+//        case Success(dt) => dt
+//        case Failure(ex) => throw ex
+//      }
+//
+//      // my algo
+//      val algo = BatchProfileAlgo(allParam)
+//
+//      // profile algorithm
+//      val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, ruleAnalyzer)
+//
+//      println(s"match percentage: ${profileResult.matchPercentage}, match count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}")
+//
+//      matchedRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println)
+//
+//      // end time
+//      val endTime = new Date().getTime
+//      println(s"using time: ${endTime - startTime} ms")
+//    } match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-4)
+//      }
+//      case _ => {
+//        info("calculation finished")
+//      }
+//    }
+//  }
+//
+//  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
+//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+//    paramReader.readConfig[T]
+//  }
+//
+//  private def validateParams(allParam: AllParam): Try[Boolean] = {
+//    val allParamValidator = AllParamValidator()
+//    allParamValidator.validate(allParam)
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/service/src/main/resources/application-dev.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/application-dev.properties b/service/src/main/resources/application-dev.properties
new file mode 100644
index 0000000..12f8865
--- /dev/null
+++ b/service/src/main/resources/application-dev.properties
@@ -0,0 +1,22 @@
+spring.datasource.url= jdbc:mysql://localhost:3306/quartz?autoReconnect=true&useSSL=false
+spring.datasource.username =griffin
+spring.datasource.password =123456
+
+spring.datasource.driver-class-name=com.mysql.jdbc.Driver
+
+## Hibernate ddl auto (validate,create, create-drop, update)
+
+spring.jpa.hibernate.ddl-auto = create-drop
+spring.jpa.show-sql=true
+spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
+#
+#
+## Naming strategy
+spring.jpa.hibernate.naming-strategy = org.hibernate.cfg.ImprovedNamingStrategy
+
+# hive metastore
+hive.metastore.uris = thrift://10.9.246.187:9083
+hive.metastore.dbname = default
+
+# kafka schema registry
+kafka.schema.registry.url = http://10.65.159.119:8081

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1cca069d/service/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties
index 12f8865..685ca8a 100644
--- a/service/src/main/resources/application.properties
+++ b/service/src/main/resources/application.properties
@@ -15,8 +15,8 @@ spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
 spring.jpa.hibernate.naming-strategy = org.hibernate.cfg.ImprovedNamingStrategy
 
 # hive metastore
-hive.metastore.uris = thrift://10.9.246.187:9083
+hive.metastore.uris = thrift://localhost:9083
 hive.metastore.dbname = default
 
 # kafka schema registry
-kafka.schema.registry.url = http://10.65.159.119:8081
+kafka.schema.registry.url = http://localhost:8081