You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/06/02 09:31:53 UTC

[4/6] incubator-griffin git commit: griffin-measure package modification

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/resources/users_info_target.dat
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/users_info_target.dat b/measure/measure-batch/src/test/resources/users_info_target.dat
deleted file mode 100644
index 07a6b40..0000000
--- a/measure/measure-batch/src/test/resources/users_info_target.dat
+++ /dev/null
@@ -1,50 +0,0 @@
-10001|Tom001|Jerrya|201 DisneyCity|tomajerrya001@dc.org|10000101|94022
-10002|Tom002|Jerrya|202 DisneyCity|tomajerrya002@dc.org|10000102|94022
-10003|Tom003|Jerrya|203 DisneyCity|tomajerrya003@dc.org|10000003|94022
-10004|Tom004|Jerrya|204 DisneyCity|tomajerrya004@dc.org|10000004|94022
-10005|Tom005|Jerrya|205 DisneyCity|tomajerrya005@dc.org|10000005|94022
-10006|Tom006|Jerrya|206 DisneyCity|tomajerrya006@dc.org|10000006|94022
-10007|Tom007|Jerrya|207 DisneyCity|tomajerrya007@dc.org|10000007|94022
-10008|Tom008|Jerrya|208 DisneyCity|tomajerrya008@dc.org|10000008|94022
-10009|Tom009|Jerrya|209 DisneyCity|tomajerrya009@dc.org|10000009|94022
-10010|Tom010|Jerrya|210 DisneyCity|tomajerrya010@dc.org|10000010|94022
-10011|Tom011|Jerrya|211 DisneyCity|tomajerrya011@dc.org|10000011|94022
-10012|Tom012|Jerrya|212 DisneyCity|tomajerrya012@dc.org|10000012|94022
-10013|Tom013|Jerrya|213 DisneyCity|tomajerrya013@dc.org|10000013|94022
-10014|Tom014|Jerrya|214 DisneyCity|tomajerrya014@dc.org|10000014|94022
-10015|Tom015|Jerrya|215 DisneyCity|tomajerrya015@dc.org|10000015|94022
-10016|Tom016|Jerrya|216 DisneyCity|tomajerrya016@dc.org|10000016|94022
-10017|Tom017|Jerrya|217 DisneyCity|tomajerrya017@dc.org|10000017|94022
-10018|Tom018|Jerrya|218 DisneyCity|tomajerrya018@dc.org|10000018|94022
-10019|Tom019|Jerrya|219 DisneyCity|tomajerrya019@dc.org|10000019|94022
-10020|Tom020|Jerrya|220 DisneyCity|tomajerrya020@dc.org|10000020|94022
-10021|Tom021|Jerrya|221 DisneyCity|tomajerrya021@dc.org|10000021|94022
-10022|Tom022|Jerrya|222 DisneyCity|tomajerrya022@dc.org|10000022|94022
-10023|Tom023|Jerrya|223 DisneyCity|tomajerrya023@dc.org|10000023|94022
-10024|Tom024|Jerrya|224 DisneyCity|tomajerrya024@dc.org|10000024|94022
-10025|Tom025|Jerrya|225 DisneyCity|tomajerrya025@dc.org|10000025|94022
-10026|Tom026|Jerrya|226 DisneyCity|tomajerrya026@dc.org|10000026|94022
-10027|Tom027|Jerrya|227 DisneyCity|tomajerrya027@dc.org|10000027|94022
-10028|Tom028|Jerrya|228 DisneyCity|tomajerrya028@dc.org|10000028|94022
-10029|Tom029|Jerrya|229 DisneyCity|tomajerrya029@dc.org|10000029|94022
-10030|Tom030|Jerrya|230 DisneyCity|tomajerrya030@dc.org|10000030|94022
-10031|Tom031|Jerrya|231 DisneyCity|tomajerrya031@dc.org|10000031|94022
-10032|Tom032|Jerrya|232 DisneyCity|tomajerrya032@dc.org|10000032|94022
-10033|Tom033|Jerrya|233 DisneyCity|tomajerrya033@dc.org|10000033|94022
-10034|Tom034|Jerrya|234 DisneyCity|tomajerrya034@dc.org|10000034|94022
-10035|Tom035|Jerrya|235 DisneyCity|tomajerrya035@dc.org|10000035|94022
-10036|Tom036|Jerrya|236 DisneyCity|tomajerrya036@dc.org|10000036|94022
-10037|Tom037|Jerrya|237 DisneyCity|tomajerrya037@dc.org|10000037|94022
-10038|Tom038|Jerrya|238 DisneyCity|tomajerrya038@dc.org|10000038|94022
-10039|Tom039|Jerrya|239 DisneyCity|tomajerrya039@dc.org|10000039|94022
-10040|Tom040|Jerrya|240 DisneyCity|tomajerrya040@dc.org|10000040|94022
-10041|Tom041|Jerrya|241 DisneyCity|tomajerrya041@dc.org|10000041|94022
-10042|Tom042|Jerrya|242 DisneyCity|tomajerrya042@dc.org|10000042|94022
-10043|Tom043|Jerrya|243 DisneyCity|tomajerrya043@dc.org|10000043|94022
-10044|Tom044|Jerrya|244 DisneyCity|tomajerrya044@dc.org|10000044|94022
-10045|Tom045|Jerrya|245 DisneyCity|tomajerrya045@dc.org|10000045|94022
-10046|Tom046|Jerrya|246 DisneyCity|tomajerrya046@dc.org|10000046|94022
-10047|Tom047|Jerrya|247 DisneyCity|tomajerrya047@dc.org|10000047|94022
-10048|Tom048|Jerrya|248 DisneyCity|tomajerrya048@dc.org|10000048|94022
-10049|Tom049|Jerrya|249 DisneyCity|tomajerrya049@dc.org|10000049|94022
-10050|Tom050|Jerrya|250 DisneyCity|tomajerrya050@dc.org|10000050|94022
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
deleted file mode 100644
index 1c99491..0000000
--- a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-//package org.apache.griffin.measure.batch.algo
-//
-//import java.util.Date
-//
-//import org.apache.griffin.measure.batch.config.params._
-//import org.apache.griffin.measure.batch.config.params.env._
-//import org.apache.griffin.measure.batch.config.params.user._
-//import org.apache.griffin.measure.batch.config.reader._
-//import org.apache.griffin.measure.batch.config.validator._
-//import org.apache.griffin.measure.batch.connector.{DataConnector, DataConnectorFactory}
-//import org.apache.griffin.measure.batch.log.Loggable
-//import org.apache.griffin.measure.batch.rule.expr._
-//import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class BatchAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-//  val envFile = "src/test/resources/env.json"
-////  val confFile = "src/test/resources/config.json"
-//  val confFile = "{\"name\":\"accu1\",\"type\":\"accuracy\",\"source\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_src.avro\"}},\"target\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_target.avro\"}},\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code AND (15 OR true) WHEN true AND $source.user_id > 10020\"}}"
-//  val envFsType = "local"
-//  val userFsType = "raw"
-//
-//  val args = Array(envFile, confFile)
-//
-//  var sc: SparkContext = _
-//  var sqlContext: SQLContext = _
-//
-//  var allParam: AllParam = _
-//
-//  before {
-//    // read param files
-//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    allParam = AllParam(envParam, userParam)
-//
-//    // validate param files
-//    validateParams(allParam) match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-3)
-//      }
-//      case _ => {
-//        info("params validation pass")
-//      }
-//    }
-//
-//    val metricName = userParam.name
-//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
-//    sc = new SparkContext(conf)
-//    sqlContext = new SQLContext(sc)
-//  }
-//
-//  test("algorithm") {
-//    Try {
-//      val envParam = allParam.envParam
-//      val userParam = allParam.userParam
-//
-//      // start time
-//      val startTime = new Date().getTime()
-//
-//      // get spark application id
-//      val applicationId = sc.applicationId
-//
-//      // rules
-//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-//      val rule: StatementExpr = ruleFactory.generateRule()
-//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-//
-//      ruleAnalyzer.constCacheExprs.foreach(println)
-//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
-//
-//      // global cache data
-//      val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-//      val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-//
-//      // data connector
-//      val sourceDataConnector: DataConnector =
-//        DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
-//          ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
-//        ) match {
-//          case Success(cntr) => {
-//            if (cntr.available) cntr
-//            else throw new Exception("source data not available!")
-//          }
-//          case Failure(ex) => throw ex
-//        }
-//      val targetDataConnector: DataConnector =
-//        DataConnectorFactory.getDataConnector(sqlContext, userParam.targetParam,
-//          ruleAnalyzer.targetRuleExprs, finalConstExprValueMap
-//        ) match {
-//          case Success(cntr) => {
-//            if (cntr.available) cntr
-//            else throw new Exception("target data not available!")
-//          }
-//          case Failure(ex) => throw ex
-//        }
-//
-//      // get metadata
-////      val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
-////        case Success(md) => md
-////        case Failure(ex) => throw ex
-////      }
-////      val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match {
-////        case Success(md) => md
-////        case Failure(ex) => throw ex
-////      }
-//
-//      // get data
-//      val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
-//        case Success(dt) => dt
-//        case Failure(ex) => throw ex
-//      }
-//      val targetData: RDD[(Product, Map[String, Any])] = targetDataConnector.data() match {
-//        case Success(dt) => dt
-//        case Failure(ex) => throw ex
-//      }
-//
-//      // my algo
-//      val algo = BatchAccuracyAlgo(allParam)
-//
-//      // accuracy algorithm
-//      val (accuResult, missingRdd, matchedRdd) = algo.accuracy(sourceData, targetData, ruleAnalyzer)
-//
-//      println(s"match percentage: ${accuResult.matchPercentage}, total count: ${accuResult.total}")
-//
-//      missingRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs)).foreach(println)
-//
-//      // end time
-//      val endTime = new Date().getTime
-//      println(s"using time: ${endTime - startTime} ms")
-//    } match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-4)
-//      }
-//      case _ => {
-//        info("calculation finished")
-//      }
-//    }
-//  }
-//
-//  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-//    paramReader.readConfig[T]
-//  }
-//
-//  private def validateParams(allParam: AllParam): Try[Boolean] = {
-//    val allParamValidator = AllParamValidator()
-//    allParamValidator.validate(allParam)
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
deleted file mode 100644
index a1645f3..0000000
--- a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-//package org.apache.griffin.measure.batch.algo
-//
-//import java.util.Date
-//
-//import org.apache.griffin.measure.batch.config.params._
-//import org.apache.griffin.measure.batch.config.params.env._
-//import org.apache.griffin.measure.batch.config.params.user._
-//import org.apache.griffin.measure.batch.config.reader._
-//import org.apache.griffin.measure.batch.config.validator._
-//import org.apache.griffin.measure.batch.connector.{DataConnector, DataConnectorFactory}
-//import org.apache.griffin.measure.batch.log.Loggable
-//import org.apache.griffin.measure.batch.rule.expr._
-//import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class BatchProfileAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-//  val envFile = "src/test/resources/env.json"
-//  val confFile = "src/test/resources/config-profile.json"
-//  val envFsType = "local"
-//  val userFsType = "local"
-//
-//  val args = Array(envFile, confFile)
-//
-//  var sc: SparkContext = _
-//  var sqlContext: SQLContext = _
-//
-//  var allParam: AllParam = _
-//
-//  before {
-//    // read param files
-//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    allParam = AllParam(envParam, userParam)
-//
-//    // validate param files
-//    validateParams(allParam) match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-3)
-//      }
-//      case _ => {
-//        info("params validation pass")
-//      }
-//    }
-//
-//    val metricName = userParam.name
-//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
-//    sc = new SparkContext(conf)
-//    sqlContext = new SQLContext(sc)
-//  }
-//
-//  test("algorithm") {
-//    Try {
-//      val envParam = allParam.envParam
-//      val userParam = allParam.userParam
-//
-//      // start time
-//      val startTime = new Date().getTime()
-//
-//      // get spark application id
-//      val applicationId = sc.applicationId
-//
-//      // rules
-//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-//      val rule: StatementExpr = ruleFactory.generateRule()
-//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-//
-//      ruleAnalyzer.constCacheExprs.foreach(println)
-//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
-//
-//      // global cache data
-//      val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-//      val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-//
-//      // data connector
-//      val sourceDataConnector: DataConnector =
-//        DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
-//          ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
-//        ) match {
-//          case Success(cntr) => {
-//            if (cntr.available) cntr
-//            else throw new Exception("source data not available!")
-//          }
-//          case Failure(ex) => throw ex
-//        }
-//
-//      // get data
-//      val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
-//        case Success(dt) => dt
-//        case Failure(ex) => throw ex
-//      }
-//
-//      // my algo
-//      val algo = BatchProfileAlgo(allParam)
-//
-//      // profile algorithm
-//      val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, ruleAnalyzer)
-//
-//      println(s"match percentage: ${profileResult.matchPercentage}, match count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}")
-//
-//      matchedRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println)
-//
-//      // end time
-//      val endTime = new Date().getTime
-//      println(s"using time: ${endTime - startTime} ms")
-//    } match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-4)
-//      }
-//      case _ => {
-//        info("calculation finished")
-//      }
-//    }
-//  }
-//
-//  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-//    paramReader.readConfig[T]
-//  }
-//
-//  private def validateParams(allParam: AllParam): Try[Boolean] = {
-//    val allParamValidator = AllParamValidator()
-//    allParamValidator.validate(allParam)
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCoreTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCoreTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCoreTest.scala
deleted file mode 100644
index 4b7f850..0000000
--- a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCoreTest.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-package org.apache.griffin.measure.batch.algo.core
-
-import org.apache.griffin.measure.batch.config.params.user.EvaluateRuleParam
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.griffin.measure.batch.rule.{RuleAnalyzer, RuleFactory}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-import org.scalatest.PrivateMethodTester
-
-@RunWith(classOf[JUnitRunner])
-class AccuracyCoreTest extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester {
-
-  def findExprId(exprs: Iterable[Expr], desc: String): String = {
-    exprs.find(_.desc == desc) match {
-      case Some(expr) => expr._id
-      case _ => ""
-    }
-  }
-
-  test ("match data success") {
-    val rule = "$source.name = $target.name AND $source.age < $target.age"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    val statement = ruleFactory.generateRule
-    val ruleAnalyzer = RuleAnalyzer(statement)
-
-    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
-    val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs
-
-    val source = (Map[String, Any](
-      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
-      (findExprId(sourcePersistExprs, "$source['age']") -> 26)
-    ), Map[String, Any]())
-    val target = (Map[String, Any](
-      (findExprId(targetPersistExprs, "$target['name']") -> "jack"),
-      (findExprId(targetPersistExprs, "$target['age']") -> 27)
-    ), Map[String, Any]())
-
-    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
-    val result = AccuracyCore invokePrivate matchData(source, target, ruleAnalyzer)
-    result._1 should be (true)
-    result._2.size should be (0)
-  }
-
-  test ("match data fail") {
-    val rule = "$source.name = $target.name AND $source.age = $target.age"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    val statement = ruleFactory.generateRule
-    val ruleAnalyzer = RuleAnalyzer(statement)
-
-    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
-    val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs
-
-    val source = (Map[String, Any](
-      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
-      (findExprId(sourcePersistExprs, "$source['age']") -> 26)
-    ), Map[String, Any]())
-    val target = (Map[String, Any](
-      (findExprId(targetPersistExprs, "$target['name']") -> "jack"),
-      (findExprId(targetPersistExprs, "$target['age']") -> 27)
-    ), Map[String, Any]())
-
-    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
-    val result = AccuracyCore invokePrivate matchData(source, target, ruleAnalyzer)
-    result._1 should be (false)
-    result._2.size shouldNot be (0)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReaderTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReaderTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReaderTest.scala
deleted file mode 100644
index 6e5ddf5..0000000
--- a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReaderTest.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.griffin.measure.batch.config.reader
-
-import org.apache.griffin.measure.batch.config.params.env._
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class ParamRawStringReaderTest extends FunSuite with Matchers with BeforeAndAfter {
-
-  test("read config") {
-    val rawString = """{"type": "hdfs", "config": {"path": "/path/to", "time": 1234567}}"""
-
-    val reader = ParamRawStringReader(rawString)
-    val paramTry = reader.readConfig[PersistParam]
-    paramTry.isSuccess should be (true)
-    paramTry.get should be (PersistParam("hdfs", Map[String, Any](("path" -> "/path/to"), ("time" -> 1234567))))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidatorTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidatorTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidatorTest.scala
deleted file mode 100644
index 23c2150..0000000
--- a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidatorTest.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.griffin.measure.batch.config.validator
-
-import org.apache.griffin.measure.batch.config.params._
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
-import org.scalamock.scalatest.MockFactory
-
-@RunWith(classOf[JUnitRunner])
-class AllParamValidatorTest extends FlatSpec with Matchers with BeforeAndAfter with MockFactory {
-
-  "validate" should "pass" in {
-    val validator = AllParamValidator()
-    val paramMock = mock[Param]
-    paramMock.validate _ expects () returning (false)
-
-    val validateTry = validator.validate(paramMock)
-    validateTry.isSuccess should be (true)
-    validateTry.get should be (false)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/persist/HdfsPersistTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/persist/HdfsPersistTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/persist/HdfsPersistTest.scala
deleted file mode 100644
index 3dd4c91..0000000
--- a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/persist/HdfsPersistTest.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.griffin.measure.batch.persist
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-import scala.util.{Try, Failure}
-
-@RunWith(classOf[JUnitRunner])
-class HdfsPersistTest extends FunSuite with Matchers with BeforeAndAfter {
-
-  val config: Map[String, Any] = Map[String, Any](
-    ("path" -> "/path/to"), ("max.persist.lines" -> 100), ("max.lines.per.file" -> 1000))
-  val metricName: String = "metric"
-  val timeStamp: Long = 123456789L
-
-  val hdfsPersist = HdfsPersist(config, metricName, timeStamp)
-
-  test ("constructor") {
-    hdfsPersist.path should be ("/path/to")
-    hdfsPersist.maxPersistLines should be (100)
-    hdfsPersist.maxLinesPerFile should be (1000)
-
-    hdfsPersist.StartFile should be (s"/path/to/${metricName}/${timeStamp}/_START")
-  }
-
-  test ("avaiable") {
-    hdfsPersist.available should be (true)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/persist/HttpPersistTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/persist/HttpPersistTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/persist/HttpPersistTest.scala
deleted file mode 100644
index 0453680..0000000
--- a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/persist/HttpPersistTest.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.griffin.measure.batch.persist
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class HttpPersistTest extends FunSuite with Matchers with BeforeAndAfter {
-
-  val config: Map[String, Any] = Map[String, Any](("api" -> "url/api"), ("method" -> "post"))
-  val metricName: String = "metric"
-  val timeStamp: Long = 123456789L
-
-  val httpPersist = HttpPersist(config, metricName, timeStamp)
-
-  test ("constructor") {
-    httpPersist.api should be ("url/api")
-    httpPersist.method should be ("post")
-  }
-
-  test("available") {
-    httpPersist.available should be (true)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/result/AccuracyResultTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/result/AccuracyResultTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/result/AccuracyResultTest.scala
deleted file mode 100644
index b0f96c3..0000000
--- a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/result/AccuracyResultTest.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.griffin.measure.batch.result
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class AccuracyResultTest extends FunSuite with BeforeAndAfter with Matchers {
-
-  test ("update") {
-    val result = AccuracyResult(10, 100)
-    val delta = AccuracyResult(3, 10)
-    result.update(delta) should be (AccuracyResult(3, 100))
-  }
-
-  test ("eventual") {
-    val result1 = AccuracyResult(10, 100)
-    result1.eventual should be (false)
-
-    val result2 = AccuracyResult(0, 100)
-    result2.eventual should be (true)
-  }
-
-  test ("differsFrom") {
-    val result = AccuracyResult(10, 100)
-    result.differsFrom(AccuracyResult(11, 100)) should be (true)
-    result.differsFrom(AccuracyResult(10, 110)) should be (true)
-    result.differsFrom(AccuracyResult(10, 100)) should be (false)
-  }
-
-  test ("matchPercentage") {
-    val result1 = AccuracyResult(10, 100)
-    result1.matchPercentage should be (90.0)
-
-    val result2 = AccuracyResult(10, 0)
-    result2.matchPercentage should be (0.0)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerTest.scala
deleted file mode 100644
index f59a618..0000000
--- a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerTest.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.apache.griffin.measure.batch.rule
-
-import org.apache.griffin.measure.batch.config.params.user.EvaluateRuleParam
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class RuleAnalyzerTest extends FunSuite with BeforeAndAfter with Matchers {
-
-  test ("rule analyze") {
-    val rule = "$source.name = $target.name AND $source.age = $target.age + (2 * 5) WHEN $source.born > (6 - 2 * 2)"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    val statement = ruleFactory.generateRule
-
-    val ruleAnalyzer = RuleAnalyzer(statement)
-
-    ruleAnalyzer.constCacheExprs.map(_.desc) should be (List[String]("2 * 5", "2 * 2", "6 - 2 * 2"))
-    ruleAnalyzer.constFinalCacheExprs.map(_.desc) should be (Set[String]("2 * 5", "6 - 2 * 2"))
-
-    ruleAnalyzer.sourceRuleExprs.groupbyExprs.map(_.desc) should be (List[String](
-      "$source['name']", "$source['age']"))
-    ruleAnalyzer.sourceRuleExprs.cacheExprs.map(_.desc) should be (List[String](
-      "$source['name']", "$source['age']", "$source['born']", "$source['born'] > 6 - 2 * 2"))
-    ruleAnalyzer.sourceRuleExprs.finalCacheExprs.map(_.desc) should be (Set[String](
-      "$source['name']", "$source['age']", "$source['born']", "$source['born'] > 6 - 2 * 2"))
-    ruleAnalyzer.sourceRuleExprs.persistExprs.map(_.desc) should be (List[String](
-      "$source['name']", "$source['age']", "$source['born']"))
-    ruleAnalyzer.sourceRuleExprs.whenClauseExprOpt.map(_.desc) should be (Some(
-      "$source['born'] > 6 - 2 * 2"))
-
-    ruleAnalyzer.targetRuleExprs.groupbyExprs.map(_.desc) should be (List[String](
-      "$target['name']", "$target['age'] + 2 * 5"))
-    ruleAnalyzer.targetRuleExprs.cacheExprs.map(_.desc) should be (List[String](
-      "$target['name']", "$target['age']", "$target['age'] + 2 * 5"))
-    ruleAnalyzer.targetRuleExprs.finalCacheExprs.map(_.desc) should be (Set[String](
-      "$target['name']", "$target['age']", "$target['age'] + 2 * 5"))
-    ruleAnalyzer.targetRuleExprs.persistExprs.map(_.desc) should be (List[String](
-      "$target['name']", "$target['age']"))
-    ruleAnalyzer.targetRuleExprs.whenClauseExprOpt.map(_.desc) should be (Some(
-      "$source['born'] > 6 - 2 * 2"))
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleFactoryTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleFactoryTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleFactoryTest.scala
deleted file mode 100644
index 065f1f6..0000000
--- a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleFactoryTest.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.griffin.measure.batch.rule
-
-import org.apache.griffin.measure.batch.config.params.user.EvaluateRuleParam
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class RuleFactoryTest extends FunSuite with BeforeAndAfter with Matchers {
-
-  test ("generate rule") {
-    val rule = "$source.name = $target.name AND $source.age = $target.age"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    ruleFactory.generateRule.desc should be ("$source['name'] = $target['name'] AND $source['age'] = $target['age']")
-
-    val wrong_rule = "$source.name = $target.name AND $source.age = $target1.age"
-    val evaluateRuleParam1 = EvaluateRuleParam(1.0, wrong_rule)
-    val ruleFactory1 = RuleFactory(evaluateRuleParam1)
-    val thrown = intercept[Exception] {
-      ruleFactory1.generateRule
-    }
-    thrown.getMessage should be ("parse rule error!")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
deleted file mode 100644
index aab5a1a..0000000
--- a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-package org.apache.griffin.measure.batch.rule
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//import org.scalatest.FlatSpec
-//import org.scalamock.scalatest.MockFactory
-
-@RunWith(classOf[JUnitRunner])
-class RuleParserTest extends FunSuite with Matchers with BeforeAndAfter {
-
-  val ruleParser = RuleParser()
-
-  test ("literal number") {
-    val rule1 = "123"
-    val result1 = ruleParser.parseAll(ruleParser.literal, rule1)
-    result1.successful should be (true)
-    result1.get.value should be (Some(123))
-
-    val rule2 = "12.3"
-    val result2 = ruleParser.parseAll(ruleParser.literal, rule2)
-    result2.successful should be (true)
-    result2.get.value should be (Some(12.3))
-  }
-
-  test ("literial string") {
-    val rule1 = "'123'"
-    val result1 = ruleParser.parseAll(ruleParser.literal, rule1)
-    result1.successful should be (true)
-    result1.get.value should be (Some("123"))
-
-    val rule2 = "\"123\""
-    val result2 = ruleParser.parseAll(ruleParser.literal, rule1)
-    result2.successful should be (true)
-    result2.get.value should be (Some("123"))
-  }
-
-  test ("literial time") {
-    val rule = "3h"
-    val result = ruleParser.parseAll(ruleParser.literal, rule)
-    result.successful should be (true)
-    result.get.value should be (Some(3*3600*1000))
-  }
-
-  test ("literial boolean") {
-    val rule = "true"
-    val result = ruleParser.parseAll(ruleParser.literal, rule)
-    result.successful should be (true)
-    result.get.value should be (Some(true))
-  }
-
-  test ("literial null") {
-    val rule = "null"
-    val result = ruleParser.parseAll(ruleParser.literal, rule)
-    result.successful should be (true)
-    result.get.value should be (None)
-  }
-
-  test ("selection head") {
-    val rule = "$source"
-    val result = ruleParser.parseAll(ruleParser.selectionHead, rule)
-    result.successful should be (true)
-    result.get.head should be ("source")
-  }
-
-  test ("field select") {
-    val rule = ".name"
-    val result = ruleParser.parseAll(ruleParser.selector, rule)
-    result.successful should be (true)
-    result.get.desc should be ("['name']")
-  }
-
-  test ("function operation") {
-    val rule = ".func(1, 'abc', 3 + 4)"
-    val result = ruleParser.parseAll(ruleParser.selector, rule)
-    result.successful should be (true)
-    result.get.desc should be (".func(1, 'abc', 3 + 4)")
-  }
-
-  test ("index field range select") {
-    val rule1 = "['field']"
-    val result1 = ruleParser.parseAll(ruleParser.selector, rule1)
-    result1.successful should be (true)
-    result1.get.desc should be ("['field']")
-
-    val rule2 = "[1, 4]"
-    val result2 = ruleParser.parseAll(ruleParser.selector, rule2)
-    result2.successful should be (true)
-    result2.get.desc should be ("[1, 4]")
-
-    val rule3 = "[1, 'name', 'age', 5, (6, 8)]"
-    val result3 = ruleParser.parseAll(ruleParser.selector, rule3)
-    result3.successful should be (true)
-    result3.get.desc should be ("[1, 'name', 'age', 5, (6, 8)]")
-  }
-
-  test ("index field range") {
-    val rule1 = "(3, 5)"
-    val result1 = ruleParser.parseAll(ruleParser.indexFieldRange, rule1)
-    result1.successful should be (true)
-    result1.get.desc should be ("(3, 5)")
-
-    val rule2 = "'name'"
-    val result2 = ruleParser.parseAll(ruleParser.indexFieldRange, rule2)
-    result2.successful should be (true)
-    result2.get.desc should be ("'name'")
-
-    val rule3 = "*"
-    val result3 = ruleParser.parseAll(ruleParser.indexFieldRange, rule3)
-    result3.successful should be (true)
-    result3.get.desc should be ("*")
-  }
-
-  test ("filter select") {
-    val rule = "['age' > 16]"
-    val result = ruleParser.parseAll(ruleParser.selector, rule)
-    result.successful should be (true)
-    result.get.desc should be ("['age' > 16]")
-  }
-
-  test ("selection") {
-    val rule = "$source['age' > 16].func(1, 'abc')[1, 3, 'name'].time[*]"
-    val result = ruleParser.parseAll(ruleParser.selection, rule)
-    result.successful should be (true)
-    result.get.desc should be ("$source['age' > 16].func(1, 'abc')[1, 3, 'name']['time'][*]")
-  }
-
-  test ("math expr") {
-    val rule = "$source.age * 6 + 4 / 2"
-    val result = ruleParser.parseAll(ruleParser.mathExpr, rule)
-    result.successful should be (true)
-    result.get.desc should be ("$source['age'] * 6 + 4 / 2")
-  }
-
-  test ("range expr") {
-    val rule = "($source.age + 1, $target.age + 3, 40)"
-    val result = ruleParser.parseAll(ruleParser.rangeExpr, rule)
-    result.successful should be (true)
-    result.get.desc should be ("($source['age'] + 1, $target['age'] + 3, 40)")
-  }
-
-  test ("logical expr") {
-    val rule1 = "$source.age + 1 = $target.age"
-    val result1 = ruleParser.parseAll(ruleParser.logicalExpr, rule1)
-    result1.successful should be (true)
-    result1.get.desc should be ("$source['age'] + 1 = $target['age']")
-
-    val rule2 = "$source.age in (3, 5, 6, 10)"
-    val result2 = ruleParser.parseAll(ruleParser.logicalExpr, rule2)
-    result2.successful should be (true)
-    result2.get.desc should be ("$source['age'] in (3, 5, 6, 10)")
-  }
-
-  test ("logical statement") {
-    val rule1 = "$source.descs[0] = $target.desc AND $source.name = $target.name"
-    val result1 = ruleParser.parseAll(ruleParser.logicalStatement, rule1)
-    result1.successful should be (true)
-    result1.get.desc should be ("$source['descs'][0] = $target['desc'] AND $source['name'] = $target['name']")
-
-    val rule2 = "NOT $source.age = $target.age"
-    val result2 = ruleParser.parseAll(ruleParser.logicalStatement, rule2)
-    result2.successful should be (true)
-    result2.get.desc should be ("NOT $source['age'] = $target['age']")
-  }
-
-  test ("whole rule") {
-    val rule1 = "$source.name = $target.name AND $source.age = $target.age"
-    val result1 = ruleParser.parseAll(ruleParser.rule, rule1)
-    result1.successful should be (true)
-    result1.get.desc should be ("$source['name'] = $target['name'] AND $source['age'] = $target['age']")
-
-    val rule2 = "$source.name = $target.name AND $source.age = $target.age WHEN $source.id > 1000"
-    val result2 = ruleParser.parseAll(ruleParser.rule, rule2)
-    result2.successful should be (true)
-    result2.get.desc should be ("$source['name'] = $target['name'] AND $source['age'] = $target['age'] when $source['id'] > 1000")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/utils/JsonUtilTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/utils/JsonUtilTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/utils/JsonUtilTest.scala
deleted file mode 100644
index 11c8b63..0000000
--- a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/utils/JsonUtilTest.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-package org.apache.griffin.measure.batch.utils
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-
-@RunWith(classOf[JUnitRunner])
-class JsonUtilTest extends FunSuite with Matchers with BeforeAndAfter {
-
-  val map = Map[String, Any](("name" -> "test"), ("age" -> 15))
-  val json = """{"name":"test","age":15}"""
-
-  val person = JsonUtilTest.Person("test", 15)
-
-  test ("toJson 1") {
-    val symbolMap = map.map(p => (Symbol(p._1), p._2))
-    JsonUtil.toJson(symbolMap) should equal (json)
-  }
-
-  test ("toJson 2") {
-    JsonUtil.toJson(map) should equal (json)
-  }
-
-  test ("toMap") {
-    JsonUtil.toMap(json) should equal (map)
-  }
-
-  test ("fromJson 1") {
-    JsonUtil.fromJson[JsonUtilTest.Person](json) should equal (person)
-  }
-
-  test ("fromJson 2") {
-    val is = new java.io.ByteArrayInputStream(json.getBytes("utf-8"));
-    JsonUtil.fromJson[JsonUtilTest.Person](is) should equal (person)
-  }
-
-}
-
-object JsonUtilTest {
-  case class Person(name: String, age: Int){}
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/pom.xml
----------------------------------------------------------------------
diff --git a/measure/pom.xml b/measure/pom.xml
index cf0e09b..67649c5 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -5,11 +5,7 @@
   <groupId>org.apache.griffin</groupId>
   <artifactId>measure</artifactId>
   <version>0.1.0-SNAPSHOT</version>
-  <packaging>pom</packaging>
-
-  <modules>
-    <module>measure-batch</module>
-  </modules>
+  <packaging>jar</packaging>
 
   <name>Apache Griffin :: Measures</name>
   <url>http://maven.apache.org</url>
@@ -149,15 +145,6 @@
 
   <build>
     <plugins>
-      <!--<plugin>-->
-        <!--<groupId>org.apache.maven.plugins</groupId>-->
-        <!--<artifactId>maven-compiler-plugin</artifactId>-->
-        <!--<configuration>-->
-          <!--&lt;!&ndash; or whatever version you use &ndash;&gt;-->
-          <!--<source>1.8</source>-->
-          <!--<target>1.8</target>-->
-        <!--</configuration>-->
-      <!--</plugin>-->
       <plugin>
         <groupId>org.scala-tools</groupId>
         <artifactId>maven-scala-plugin</artifactId>
@@ -186,7 +173,7 @@
           <descriptorRefs>
             <descriptorRef>jar-with-dependencies</descriptorRef>
           </descriptorRefs>
-          <finalName>griffin-measure-batch</finalName>
+          <finalName>griffin-measure</finalName>
           <appendAssemblyId>false</appendAssemblyId>
         </configuration>
         <executions>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/resources/config-old.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-old.json b/measure/src/main/resources/config-old.json
new file mode 100644
index 0000000..63dee69
--- /dev/null
+++ b/measure/src/main/resources/config-old.json
@@ -0,0 +1,45 @@
+{
+  "name": "accu1",
+  "type": "accuracy",
+
+  "source": {
+    "connector": {
+      "type": "hive",
+      "version": "1.2",
+      "config": {
+        "table.name": "users_info_src",
+        "partitions": "dt=20170410, hour=14"
+      }
+    }
+  },
+
+  "target": {
+    "connector": {
+      "type": "hive",
+      "version": "1.2",
+      "config": {
+        "database": "default",
+        "table.name": "users_info_target",
+        "partitions": "dt=20170410, hour=14; dt=20170410, hour=15"
+      }
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 1,
+    "assertion": {
+      "type": "DSL-griffin",
+      "rules": [
+        {
+          "rule": "@Key ${source}['user_id'] === ${target}['user_id']"
+        },
+        {
+          "rule": "${source}['first_name'] === ${target}['first_name']; ${source}['last_name'] === ${target}['last_name']; ${source}['address'] === ${target}['address']"
+        },
+        {
+          "rule": "${source}['email'] === ${target}['email']; ${source}['phone'] === ${target}['phone']; ${source}['post_code'] === ${target}['post_code']"
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config.json b/measure/src/main/resources/config.json
new file mode 100644
index 0000000..edd2e6a
--- /dev/null
+++ b/measure/src/main/resources/config.json
@@ -0,0 +1,29 @@
+{
+  "name": "accu1",
+  "type": "accuracy",
+
+  "source": {
+    "type": "hive",
+    "version": "1.2",
+    "config": {
+      "database": "default",
+      "table.name": "users_info_src",
+      "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+    }
+  },
+
+  "target": {
+    "type": "hive",
+    "version": "1.2",
+    "config": {
+      "database": "default",
+      "table.name": "users_info_target",
+      "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 0.2,
+    "rules": "$source.user_id = $target.user_id AND $source.first_name = $target.first_name AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/resources/env.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/env.json b/measure/src/main/resources/env.json
new file mode 100644
index 0000000..57da895
--- /dev/null
+++ b/measure/src/main/resources/env.json
@@ -0,0 +1,29 @@
+{
+  "spark": {
+    "log.level": "INFO",
+    "checkpoint.dir": "hdfs:///griffin/batch/cp",
+    "config": {}
+  },
+
+  "persist": [
+    {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs:///griffin/streaming/persist",
+        "max.persist.lines": 10000,
+        "max.lines.per.file": 10000
+      }
+    },
+    {
+      "type": "http",
+      "config": {
+        "method": "post",
+        "api": "http://HOSTNAME:9200/griffin/accuracy"
+      }
+    }
+  ],
+
+  "cleaner": {
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/log4j.properties b/measure/src/main/resources/log4j.properties
new file mode 100644
index 0000000..bd31e15
--- /dev/null
+++ b/measure/src/main/resources/log4j.properties
@@ -0,0 +1,5 @@
+log4j.rootLogger=INFO, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/Application.scala
new file mode 100644
index 0000000..78f9271
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/Application.scala
@@ -0,0 +1,109 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch
+
+import org.apache.griffin.measure.batch.algo._
+import org.apache.griffin.measure.batch.config.params._
+import org.apache.griffin.measure.batch.config.params.env._
+import org.apache.griffin.measure.batch.config.params.user._
+import org.apache.griffin.measure.batch.config.reader._
+import org.apache.griffin.measure.batch.config.validator.AllParamValidator
+import org.apache.griffin.measure.batch.log.Loggable
+
+import scala.util.{Failure, Success, Try}
+
+object Application extends Loggable {
+
+  def main(args: Array[String]): Unit = {
+    info(args.toString)
+    if (args.length < 2) {
+      error("Usage: class <env-param> <user-param> [List of String split by comma: raw | local | hdfs(default)]")
+      sys.exit(-1)
+    }
+
+    val envParamFile = args(0)
+    val userParamFile = args(1)
+    val (envFsType, userFsType) = if (args.length > 2) {
+      val fsTypes = args(2).trim.split(",")
+      if (fsTypes.length == 1) (fsTypes(0).trim, fsTypes(0).trim)
+      else if (fsTypes.length >= 2) (fsTypes(0).trim, fsTypes(1).trim)
+      else ("hdfs", "hdfs")
+    } else ("hdfs", "hdfs")
+
+    info(envParamFile)
+    info(userParamFile)
+
+    // read param files
+    val envParam = readParamFile[EnvParam](envParamFile, envFsType) match {
+      case Success(p) => p
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-2)
+      }
+    }
+    val userParam = readParamFile[UserParam](userParamFile, userFsType) match {
+      case Success(p) => p
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-2)
+      }
+    }
+    val allParam: AllParam = AllParam(envParam, userParam)
+
+    // validate param files
+    validateParams(allParam) match {
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-3)
+      }
+      case _ => {
+        info("params validation pass")
+      }
+    }
+
+    // choose algorithm
+    val dqType = allParam.userParam.dqType
+    val algo: Algo = dqType match {
+      case MeasureType.accuracy() => BatchAccuracyAlgo(allParam)
+      case MeasureType.profile() => BatchProfileAlgo(allParam)
+      case _ => {
+        error(s"${dqType} is unsupported dq type!")
+        sys.exit(-4)
+      }
+    }
+
+    // algorithm run
+    algo.run match {
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-5)
+      }
+      case _ => {
+        info("calculation finished")
+      }
+    }
+  }
+
+  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
+    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+    paramReader.readConfig[T]
+  }
+
+  private def validateParams(allParam: AllParam): Try[Boolean] = {
+    val allParamValidator = AllParamValidator()
+    allParamValidator.validate(allParam)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
new file mode 100644
index 0000000..16c65b0
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
@@ -0,0 +1,20 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.algo
+
+
+trait AccuracyAlgo extends Algo {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
new file mode 100644
index 0000000..9e19a18
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
@@ -0,0 +1,30 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.algo
+
+import org.apache.griffin.measure.batch.config.params.env._
+import org.apache.griffin.measure.batch.config.params.user._
+import org.apache.griffin.measure.batch.log.Loggable
+
+import scala.util.Try
+
+trait Algo extends Loggable with Serializable {
+
+  val envParam: EnvParam
+  val userParam: UserParam
+
+  def run(): Try[_]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
new file mode 100644
index 0000000..216c7c6
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
@@ -0,0 +1,179 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.algo
+
+import java.util.Date
+
+import org.apache.griffin.measure.batch.algo.core.AccuracyCore
+import org.apache.griffin.measure.batch.config.params.AllParam
+import org.apache.griffin.measure.batch.connector._
+import org.apache.griffin.measure.batch.rule._
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.griffin.measure.batch.persist._
+import org.apache.griffin.measure.batch.result._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.util.{Failure, Success, Try}
+
+// accuracy algorithm for batch mode
+case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
+  val envParam = allParam.envParam
+  val userParam = allParam.userParam
+
+  def run(): Try[_] = {
+    Try {
+      val metricName = userParam.name
+
+      val conf = new SparkConf().setAppName(metricName)
+      val sc = new SparkContext(conf)
+      val sqlContext = new HiveContext(sc)
+
+      // start time
+      val startTime = new Date().getTime()
+
+      // get persists to persist measure result
+      val persist: Persist = PersistFactory(envParam.persistParams, metricName).getPersists(startTime)
+
+      // get spark application id
+      val applicationId = sc.applicationId
+
+      // persist start id
+      persist.start(applicationId)
+
+      // generate rule from rule param, generate rule analyzer
+      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+      val rule: StatementExpr = ruleFactory.generateRule()
+      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+
+      // const expr value map
+      val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
+      val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
+
+      // data connector
+      val sourceDataConnector: DataConnector =
+        DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
+          ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
+        ) match {
+          case Success(cntr) => {
+            if (cntr.available) cntr
+            else throw new Exception("source data connection error!")
+          }
+          case Failure(ex) => throw ex
+        }
+      val targetDataConnector: DataConnector =
+        DataConnectorFactory.getDataConnector(sqlContext, userParam.targetParam,
+          ruleAnalyzer.targetRuleExprs, finalConstExprValueMap
+        ) match {
+          case Success(cntr) => {
+            if (cntr.available) cntr
+            else throw new Exception("target data connection error!")
+          }
+          case Failure(ex) => throw ex
+        }
+
+      // get metadata
+//      val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
+//        case Success(md) => md
+//        case _ => throw new Exception("source metadata error!")
+//      }
+//      val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match {
+//        case Success(md) => md
+//        case _ => throw new Exception("target metadata error!")
+//      }
+
+      // get data
+      val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
+        case Success(dt) => dt
+        case Failure(ex) => throw ex
+      }
+      val targetData: RDD[(Product, Map[String, Any])] = targetDataConnector.data() match {
+        case Success(dt) => dt
+        case Failure(ex) => throw ex
+      }
+
+      // accuracy algorithm
+      val (accuResult, missingRdd, matchedRdd) = accuracy(sourceData, targetData, ruleAnalyzer)
+
+      // end time
+      val endTime = new Date().getTime
+      persist.log(endTime, s"calculation using time: ${endTime - startTime} ms")
+
+      // persist result
+      persist.result(endTime, accuResult)
+      val missingRecords = missingRdd.map(record2String(_, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs))
+      persist.missRecords(missingRecords)
+
+      // persist end time
+      val persistEndTime = new Date().getTime
+      persist.log(persistEndTime, s"persist using time: ${persistEndTime - endTime} ms")
+
+      // finish
+      persist.finish()
+
+      // context stop
+      sc.stop
+
+    }
+  }
+
+  def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, Any]) = {
+    (data, Map[String, Any]())
+  }
+
+  // calculate accuracy between source data and target data
+  def accuracy(sourceData: RDD[(Product, Map[String, Any])], targetData: RDD[(Product, Map[String, Any])], ruleAnalyzer: RuleAnalyzer
+              ): (AccuracyResult, RDD[(Product, (Map[String, Any], Map[String, Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = {
+
+    // 1. wrap data
+    val sourceWrappedData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceData.map(r => (r._1, wrapInitData(r._2)))
+    val targetWrappedData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = targetData.map(r => (r._1, wrapInitData(r._2)))
+
+    // 2. cogroup
+    val allKvs = sourceWrappedData.cogroup(targetWrappedData)
+
+    // 3. accuracy calculation
+    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer)
+
+    (accuResult, missingRdd, matchedRdd)
+  }
+
+  // convert data into a string
+  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr], targetPersist: Iterable[Expr]): String = {
+    val (key, (data, info)) = rec
+    val persistData = getPersistMap(data, sourcePersist)
+    val persistInfo = info.mapValues { value =>
+      value match {
+        case vd: Map[String, Any] => getPersistMap(vd, targetPersist)
+        case v => v
+      }
+    }
+    s"${persistData} [${persistInfo}]"
+  }
+
+  // get the expr value map of the persist expressions
+  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
+    val persistMap = persist.map(e => (e._id, e.desc)).toMap
+    data.flatMap { pair =>
+      val (k, v) = pair
+      persistMap.get(k) match {
+        case Some(d) => Some((d -> v))
+        case _ => None
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
new file mode 100644
index 0000000..9c0f48b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
@@ -0,0 +1,151 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.algo
+
+import java.util.Date
+
+import org.apache.griffin.measure.batch.algo.core.ProfileCore
+import org.apache.griffin.measure.batch.config.params._
+import org.apache.griffin.measure.batch.connector.{DataConnector, DataConnectorFactory}
+import org.apache.griffin.measure.batch.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.batch.result._
+import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
+import org.apache.griffin.measure.batch.rule.expr.{Expr, StatementExpr}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.util.{Failure, Success, Try}
+
+// profile algorithm for batch mode
+case class BatchProfileAlgo(allParam: AllParam) extends ProfileAlgo {
+  val envParam = allParam.envParam
+  val userParam = allParam.userParam
+
+  def run(): Try[_] = {
+    Try {
+      val metricName = userParam.name
+
+      val conf = new SparkConf().setAppName(metricName)
+      val sc = new SparkContext(conf)
+      val sqlContext = new HiveContext(sc)
+
+      // start time
+      val startTime = new Date().getTime()
+
+      // get persists to persist measure result
+      val persist: Persist = PersistFactory(envParam.persistParams, metricName).getPersists(startTime)
+
+      // get spark application id
+      val applicationId = sc.applicationId
+
+      // persist start id
+      persist.start(applicationId)
+
+      // generate rule from rule param, generate rule analyzer
+      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+      val rule: StatementExpr = ruleFactory.generateRule()
+      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+
+      // const expr value map
+      val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
+      val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
+
+      // data connector
+      val sourceDataConnector: DataConnector =
+      DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
+        ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
+      ) match {
+        case Success(cntr) => {
+          if (cntr.available) cntr
+          else throw new Exception("source data connection error!")
+        }
+        case Failure(ex) => throw ex
+      }
+
+      // get metadata
+      //      val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
+      //        case Success(md) => md
+      //        case _ => throw new Exception("source metadata error!")
+      //      }
+
+      // get data
+      val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
+        case Success(dt) => dt
+        case Failure(ex) => throw ex
+      }
+
+      // profile algorithm
+      val (profileResult, missingRdd, matchedRdd) = profile(sourceData, ruleAnalyzer)
+
+      // end time
+      val endTime = new Date().getTime
+      persist.log(endTime, s"calculation using time: ${endTime - startTime} ms")
+
+      // persist result
+      persist.result(endTime, profileResult)
+      val matchedRecords = matchedRdd.map(record2String(_, ruleAnalyzer.sourceRuleExprs.persistExprs))
+      persist.matchRecords(matchedRecords)
+
+      // persist end time
+      val persistEndTime = new Date().getTime
+      persist.log(persistEndTime, s"persist using time: ${persistEndTime - endTime} ms")
+
+      // finish
+      persist.finish()
+
+      // context stop
+      sc.stop
+    }
+  }
+
+  def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, Any]) = {
+    (data, Map[String, Any]())
+  }
+
+  // calculate profile from source data
+  def profile(sourceData: RDD[(Product, Map[String, Any])], ruleAnalyzer: RuleAnalyzer
+              ): (ProfileResult, RDD[(Product, (Map[String, Any], Map[String, Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = {
+
+    // 1. wrap data
+    val sourceWrappedData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceData.map(r => (r._1, wrapInitData(r._2)))
+
+    // 2. profile calculation
+    val (profileResult, missingRdd, matchedRdd) = ProfileCore.profile(sourceWrappedData, ruleAnalyzer)
+
+    (profileResult, missingRdd, matchedRdd)
+  }
+
+  // convert data into a string
+  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr]): String = {
+    val (key, (data, info)) = rec
+    val persistData = getPersistMap(data, sourcePersist)
+    val persistInfo = info
+    if (persistInfo.size > 0) s"${persistData} [${persistInfo}]" else s"${persistData}"
+  }
+
+  // get the expr value map of the persist expressions
+  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
+    val persistMap = persist.map(e => (e._id, e.desc)).toMap
+    data.flatMap { pair =>
+      val (k, v) = pair
+      persistMap.get(k) match {
+        case Some(d) => Some((d -> v))
+        case _ => None
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
new file mode 100644
index 0000000..aa14ac7
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
@@ -0,0 +1,22 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.algo
+
+object MeasureType {
+
+  val accuracy = """^(?i)accuracy$""".r
+  val profile = """^(?i)profile""".r
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
new file mode 100644
index 0000000..418cce0
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
@@ -0,0 +1,19 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.algo
+
+trait ProfileAlgo extends Algo {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
new file mode 100644
index 0000000..f37c68e
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
@@ -0,0 +1,97 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.algo.core
+
+import org.apache.griffin.measure.batch.rule.RuleAnalyzer
+import org.apache.griffin.measure.batch.result._
+import org.apache.spark.rdd.RDD
+
+
+object AccuracyCore {
+
+  type V = Map[String, Any]
+  type T = Map[String, Any]
+
+  // allKvs: rdd of (key, (List[(sourceData, sourceInfo)], List[(targetData, targetInfo)]))
+  // output: accuracy result, missing source data rdd, matched source data rdd
+  def accuracy(allKvs: RDD[(Product, (Iterable[(V, T)], Iterable[(V, T)]))], ruleAnalyzer: RuleAnalyzer
+              ): (AccuracyResult, RDD[(Product, (V, T))], RDD[(Product, (V, T))]) = {
+    val result: RDD[(Long, Long, List[(Product, (V, T))], List[(Product, (V, T))])] = allKvs.map { kv =>
+      val (key, (sourceDatas, targetDatas)) = kv
+
+      // result: (missCount, matchCount, missDataList, matchDataList)
+      val rslt = sourceDatas.foldLeft((0L, 0L, List[(Product, (V, T))](), List[(Product, (V, T))]())) { (sr, sourcePair) =>
+        val matchResult = if (targetDatas.isEmpty) {
+          (false, Map[String, Any](MismatchInfo.wrap("no target")))
+        } else {
+          targetDatas.foldLeft((false, Map[String, Any]())) { (tr, targetPair) =>
+            if (tr._1) tr
+            else matchData(sourcePair, targetPair, ruleAnalyzer)
+          }
+        }
+
+        if (matchResult._1) {
+          val matchItem = (key, sourcePair)
+          (sr._1, sr._2 + 1, sr._3, sr._4 :+ matchItem)
+        } else {
+          val missItem = (key, (sourcePair._1, sourcePair._2 ++ matchResult._2))
+          (sr._1 + 1, sr._2, sr._3 :+ missItem, sr._4)
+        }
+      }
+
+      rslt
+    }
+
+    val missRdd = result.flatMap(_._3)
+    val matchRdd = result.flatMap(_._4)
+
+    def seq(cnt: (Long, Long), rcd: (Long, Long, Any, Any)): (Long, Long) = {
+      (cnt._1 + rcd._1, cnt._2 + rcd._2)
+    }
+    def comb(c1: (Long, Long), c2: (Long, Long)): (Long, Long) = {
+      (c1._1 + c2._1, c1._2 + c2._2)
+    }
+    val countPair = result.aggregate((0L, 0L))(seq, comb)
+
+    (AccuracyResult(countPair._1, (countPair._1 + countPair._2)), missRdd, matchRdd)
+  }
+
+  // try to match source and target data, return true if matched, false if unmatched, also with some matching info
+  private def matchData(source: (V, T), target: (V, T), ruleAnalyzer: RuleAnalyzer): (Boolean, T) = {
+
+    // 1. merge source and target cached data
+    val mergedExprValueMap: Map[String, Any] = mergeExprValueMap(source, target)
+
+    // 2. check valid
+    if (ruleAnalyzer.rule.valid(mergedExprValueMap)) {
+      // 3. substitute the cached data into statement, get the statement value
+      val matched = ruleAnalyzer.rule.calculate(mergedExprValueMap) match {
+        case Some(b: Boolean) => b
+        case _ => false
+      }
+      // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches
+      if (matched) (matched, Map[String, Any]())
+      else (matched, Map[String, Any](MismatchInfo.wrap("not matched"), TargetInfo.wrap(target._1)))
+    } else {
+      (false, Map[String, Any](MismatchInfo.wrap("invalid to compare"), TargetInfo.wrap(target._1)))
+    }
+
+  }
+
+  private def mergeExprValueMap(source: (V, T), target: (V, T)): Map[String, Any] = {
+    source._1 ++ target._1
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
new file mode 100644
index 0000000..dbe96b2
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
@@ -0,0 +1,69 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.algo.core
+
+import org.apache.griffin.measure.batch.rule.RuleAnalyzer
+import org.apache.griffin.measure.batch.result._
+import org.apache.spark.rdd.RDD
+
+
+object ProfileCore {
+
+  type V = Map[String, Any]
+  type T = Map[String, Any]
+
+  // dataRdd: rdd of (key, (sourceData, sourceInfo))
+  // output: accuracy result, missing source data rdd, matched source data rdd
+  def profile(dataRdd: RDD[(Product, (V, T))], ruleAnalyzer: RuleAnalyzer
+              ): (ProfileResult, RDD[(Product, (V, T))], RDD[(Product, (V, T))]) = {
+
+    val resultRdd: RDD[((Product, (V, T)), Boolean)] = dataRdd.map { kv =>
+      val (key, (data, info)) = kv
+      val (matched, missInfo) = matchData((data, info), ruleAnalyzer)
+      ((key, (data, info ++ missInfo)), matched)
+    }
+
+    val totalCount = resultRdd.count
+    val matchRdd = resultRdd.filter(_._2).map(_._1)
+    val matchCount = matchRdd.count
+    val missRdd = resultRdd.filter(!_._2).map(_._1)
+    val missCount = missRdd.count
+
+    (ProfileResult(matchCount, totalCount), missRdd, matchRdd)
+
+  }
+
+  // try to match data as rule, return true if matched, false if unmatched
+  private def matchData(dataPair: (V, T), ruleAnalyzer: RuleAnalyzer): (Boolean, T) = {
+
+    val data: Map[String, Any] = dataPair._1
+
+    // 1. check valid
+    if (ruleAnalyzer.rule.valid(data)) {
+      // 2. substitute the cached data into statement, get the statement value
+      val matched = ruleAnalyzer.rule.calculate(data) match {
+        case Some(b: Boolean) => b
+        case _ => false
+      }
+      // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches
+      if (matched) (matched, Map[String, Any]())
+      else (matched, Map[String, Any](MismatchInfo.wrap("not matched")))
+    } else {
+      (false, Map[String, Any](MismatchInfo.wrap("invalid to compare")))
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
new file mode 100644
index 0000000..4e2d5d8
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
@@ -0,0 +1,28 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.env._
+import org.apache.griffin.measure.batch.config.params.user._
+
+// simply composite of env and user params, for convenient usage
+@JsonInclude(Include.NON_NULL)
+case class AllParam( @JsonProperty("env") envParam: EnvParam,
+                     @JsonProperty("user") userParam: UserParam
+                   ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
new file mode 100644
index 0000000..437ac67
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
@@ -0,0 +1,21 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.params
+
+trait Param extends Serializable {
+
+  def validate(): Boolean = true
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
new file mode 100644
index 0000000..4896932
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
@@ -0,0 +1,24 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class CleanerParam() extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
new file mode 100644
index 0000000..a2aa7a5
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
@@ -0,0 +1,27 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class EnvParam( @JsonProperty("spark") sparkParam: SparkParam,
+                     @JsonProperty("persist") persistParams: List[PersistParam],
+                     @JsonProperty("cleaner") cleanerParam: CleanerParam
+                   ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
new file mode 100644
index 0000000..aaffda5
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
@@ -0,0 +1,26 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class PersistParam( @JsonProperty("type") persistType: String,
+                         @JsonProperty("config") config: Map[String, Any]
+                       ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
new file mode 100644
index 0000000..9979f19
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
@@ -0,0 +1,27 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class SparkParam( @JsonProperty("log.level") logLevel: String,
+                       @JsonProperty("checkpoint.dir") cpDir: String,
+                       @JsonProperty("config") config: Map[String, Any]
+                     ) extends Param {
+
+}