You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/05/04 03:04:12 UTC

[04/51] [partial] incubator-griffin git commit: refactor arch

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala
new file mode 100644
index 0000000..fec8c49
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala
@@ -0,0 +1,265 @@
+package org.apache.griffin.measure.batch.utils
+
+import scala.util.{Success, Try}
+
+
+object CalculationUtil {
+
+  implicit def option2CalculationValue(v: Option[_]): CalculationValue = CalculationValue(v)
+
+  case class CalculationValue(value: Option[_]) extends Serializable {
+
+    def + (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 + v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble)
+          case (None, Some(v2)) => other
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def - (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 - v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 - v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 - v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def * (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2)
+          case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 * v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 * v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 * v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 * v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 * v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def / (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 / v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 / v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 / v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def % (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 % v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 % v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 % v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def unary_- (): Option[_] = {
+      value match {
+        case Some(v: String) => Some(v.reverse.toString)
+        case Some(v: Boolean) => Some(!v)
+        case Some(v: Byte) => Some(-v)
+        case Some(v: Short) => Some(-v)
+        case Some(v: Int) => Some(-v)
+        case Some(v: Long) => Some(-v)
+        case Some(v: Float) => Some(-v)
+        case Some(v: Double) => Some(-v)
+        case Some(v) => Some(v)
+        case _ => None
+      }
+    }
+
+
+    def === (other: Option[_]): Option[Boolean] = {
+      (value, other) match {
+        case (Some(v1), Some(v2)) => Some(v1 == v2)
+        case _ => None
+      }
+    }
+
+    def =!= (other: Option[_]): Option[Boolean] = {
+      (value, other) match {
+        case (Some(v1), Some(v2)) => Some(v1 != v2)
+        case _ => None
+      }
+    }
+
+    def > (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2: String)) => Some(v1 > v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def >= (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def < (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2: String)) => Some(v1 < v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def <= (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+
+    def in (other: Iterable[Option[_]]): Option[Boolean] = {
+      other.foldLeft(Some(false): Option[Boolean]) { (res, next) =>
+        optOr(res, ===(next))
+      }
+    }
+
+    def not_in (other: Iterable[Option[_]]): Option[Boolean] = {
+      other.foldLeft(Some(true): Option[Boolean]) { (res, next) =>
+        optAnd(res, =!=(next))
+      }
+    }
+
+    def between (other: Iterable[Option[_]]): Option[Boolean] = {
+      if (other.size < 2) None else {
+        val (begin, end) = (other.head, other.tail.head)
+        optAnd(>=(begin), <=(end))
+      }
+    }
+
+    def not_between (other: Iterable[Option[_]]): Option[Boolean] = {
+      if (other.size < 2) None else {
+        val (begin, end) = (other.head, other.tail.head)
+        optOr(<(begin), >(end))
+      }
+    }
+
+    def unary_! (): Option[Boolean] = {
+      optNot(value)
+    }
+
+    def && (other: Option[_]): Option[Boolean] = {
+      optAnd(value, other)
+    }
+
+    def || (other: Option[_]): Option[Boolean] = {
+      optOr(value, other)
+    }
+
+
+    private def optNot(a: Option[_]): Option[Boolean] = {
+      a match {
+        case Some(v: Boolean) => Some(!v)
+        case _ => None
+      }
+    }
+    private def optAnd(a: Option[_], b: Option[_]): Option[Boolean] = {
+      (a, b) match {
+        case (Some(false), _) | (_, Some(false)) => Some(false)
+        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2)
+        case _ => None
+      }
+    }
+    private def optOr(a: Option[_], b: Option[_]): Option[Boolean] = {
+      (a, b) match {
+        case (Some(true), _) | (_, Some(true)) => Some(true)
+        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2)
+        case _ => None
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
new file mode 100644
index 0000000..b48478a
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
@@ -0,0 +1,62 @@
+package org.apache.griffin.measure.batch.utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
+
+object HdfsUtil {
+
+  private val seprator = "/"
+
+  private val conf = new Configuration()
+  conf.set("dfs.support.append", "true")
+
+  private val dfs = FileSystem.get(conf)
+
+  def existPath(filePath: String): Boolean = {
+    val path = new Path(filePath)
+    dfs.exists(path)
+  }
+
+  def createFile(filePath: String): FSDataOutputStream = {
+    val path = new Path(filePath)
+    if (dfs.exists(path)) dfs.delete(path, true)
+    return dfs.create(path)
+  }
+
+  def appendOrCreateFile(filePath: String): FSDataOutputStream = {
+    val path = new Path(filePath)
+    if (dfs.exists(path)) dfs.append(path) else createFile(filePath)
+  }
+
+  def openFile(filePath: String): FSDataInputStream = {
+    val path = new Path(filePath)
+    dfs.open(path)
+  }
+
+  def writeContent(filePath: String, message: String): Unit = {
+    val out = createFile(filePath)
+    out.write(message.getBytes("utf-8"))
+    out.close
+  }
+
+  def appendContent(filePath: String, message: String): Unit = {
+    val out = appendOrCreateFile(filePath)
+    out.write(message.getBytes("utf-8"))
+    out.close
+  }
+
+  def createEmptyFile(filePath: String): Unit = {
+    val out = createFile(filePath)
+    out.close
+  }
+
+
+  def getHdfsFilePath(parentPath: String, fileName: String): String = {
+    if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + seprator + fileName
+  }
+
+  def deleteHdfsPath(dirPath: String): Unit = {
+    val path = new Path(dirPath)
+    if (dfs.exists(path)) dfs.delete(path, true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
new file mode 100644
index 0000000..747d0fa
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
@@ -0,0 +1,30 @@
+package org.apache.griffin.measure.batch.utils
+
+import scalaj.http._
+
+object HttpUtil {
+
+  val GET_REGEX = """^(?i)get$""".r
+  val POST_REGEX = """^(?i)post$""".r
+  val PUT_REGEX = """^(?i)put$""".r
+  val DELETE_REGEX = """^(?i)delete$""".r
+
+  def postData(url: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = {
+    val response = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString
+    response.code.toString
+  }
+
+  def httpRequest(url: String, method: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = {
+    val httpReq = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers))
+    method match {
+      case POST_REGEX() => httpReq.postData(data).asString.code.toString
+      case PUT_REGEX() => httpReq.put(data).asString.code.toString
+      case _ => "wrong method"
+    }
+  }
+
+  private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, String] = {
+    map.map(pair => pair._1 -> pair._2.toString)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
new file mode 100644
index 0000000..cdd470a
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
@@ -0,0 +1,32 @@
+package org.apache.griffin.measure.batch.utils
+
+import java.io.InputStream
+
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import scala.reflect._
+
+object JsonUtil {
+  val mapper = new ObjectMapper()
+  mapper.registerModule(DefaultScalaModule)
+  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+
+  def toJson(value: Map[Symbol, Any]): String = {
+    toJson(value map { case (k,v) => k.name -> v})
+  }
+
+  def toJson(value: Any): String = {
+    mapper.writeValueAsString(value)
+  }
+
+  def toMap[V](json:String)(implicit m: Manifest[V]) = fromJson[Map[String,V]](json)
+
+  def fromJson[T: ClassTag](json: String)(implicit m : Manifest[T]): T = {
+    mapper.readValue[T](json, classTag[T].runtimeClass.asInstanceOf[Class[T]])
+  }
+
+  def fromJson[T: ClassTag](is: InputStream)(implicit m : Manifest[T]): T = {
+    mapper.readValue[T](is, classTag[T].runtimeClass.asInstanceOf[Class[T]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala
new file mode 100644
index 0000000..7f2b355
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala
@@ -0,0 +1,10 @@
+package org.apache.griffin.measure.batch.utils
+
+object StringParseUtil {
+
+  def sepStrings(str: String, sep: String): Iterable[String] = {
+    val strings = str.split(sep)
+    strings.map(_.trim)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/config.json b/measure/measure-batch/src/test/resources/config.json
new file mode 100644
index 0000000..65e0ed9
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/config.json
@@ -0,0 +1,25 @@
+{
+  "name": "accu1",
+  "type": "accuracy",
+
+  "source": {
+    "type": "avro",
+    "version": "1.7",
+    "config": {
+      "file.name": "src/test/resources/users_info_src.avro"
+    }
+  },
+
+  "target": {
+    "type": "avro",
+    "version": "1.7",
+    "config": {
+      "file.name": "src/test/resources/users_info_target.avro"
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 1,
+    "rules": "$source.user_id > 10020 AND $source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/config1.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/config1.json b/measure/measure-batch/src/test/resources/config1.json
new file mode 100644
index 0000000..d7290ba
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/config1.json
@@ -0,0 +1,27 @@
+{
+  "name": "accu-test",
+  "type": "accuracy",
+
+  "source": {
+    "type": "hive",
+    "version": "1.2",
+    "config": {
+      "table.name": "rheos_view_event",
+      "partitions": "dt=20170410, hour=15"
+    }
+  },
+
+  "target": {
+    "type": "hive",
+    "version": "1.2",
+    "config": {
+      "table.name": "be_view_event_queue",
+      "partitions": "dt=20170410, hour=15; dt=20170410, hour=16"
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 1,
+    "rules": "@Key ${source}['uid'] === ${target}['uid']; @Key ${source}['eventtimestamp'] === ${target}['eventtimestamp']; ${source}['page_id'] === ${target}['page_id']; ${source}['site_id'] === ${target}['site_id']; ${source}['itm'] === ${target}['itm']"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/env.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/env.json b/measure/measure-batch/src/test/resources/env.json
new file mode 100644
index 0000000..3a9e38c
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/env.json
@@ -0,0 +1,27 @@
+{
+  "spark": {
+    "log.level": "ERROR",
+    "checkpoint.dir": "hdfs:///griffin/batch/cp",
+    "config": {}
+  },
+
+  "persist": [
+    {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs:///griffin/streaming/persist"
+      }
+    },
+    {
+      "type": "http",
+      "config": {
+        "method": "post",
+        "api": "http://phxbark4dq-360935.stratus.phx.ebay.com:8080/"
+      }
+    }
+  ],
+
+  "cleaner": {
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/env1.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/env1.json b/measure/measure-batch/src/test/resources/env1.json
new file mode 100644
index 0000000..a059715
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/env1.json
@@ -0,0 +1,21 @@
+{
+  "spark": {
+    "log.level": "INFO",
+    "checkpoint.dir": "hdfs:///griffin/batch/cp",
+    "config": {}
+  },
+
+  "persist": [
+    {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs:///user/b_des/bark/griffin-batch/test",
+        "max.lines.per.file": 10000
+      }
+    }
+  ],
+
+  "cleaner": {
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/log4j.properties b/measure/measure-batch/src/test/resources/log4j.properties
new file mode 100644
index 0000000..bd31e15
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/log4j.properties
@@ -0,0 +1,5 @@
+log4j.rootLogger=INFO, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/users_info_src.avro
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/users_info_src.avro b/measure/measure-batch/src/test/resources/users_info_src.avro
new file mode 100644
index 0000000..3d5c939
Binary files /dev/null and b/measure/measure-batch/src/test/resources/users_info_src.avro differ

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/users_info_src.dat
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/users_info_src.dat b/measure/measure-batch/src/test/resources/users_info_src.dat
new file mode 100644
index 0000000..ce49443
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/users_info_src.dat
@@ -0,0 +1,50 @@
+10001|Tom001|Jerrya|201 DisneyCity|tomajerrya001@dc.org|10000001|94022
+10002|Tom002|Jerrya|202 DisneyCity|tomajerrya002@dc.org|10000002|94022
+10003|Tom003|Jerrya|203 DisneyCity|tomajerrya003@dc.org|10000003|94022
+10004|Tom004|Jerrya|204 DisneyCity|tomajerrya004@dc.org|10000004|94022
+10005|Tom005|Jerrya|205 DisneyCity|tomajerrya005@dc.org|10000005|94022
+10006|Tom006|Jerrya|206 DisneyCity|tomajerrya006@dc.org|10000006|94022
+10007|Tom007|Jerrya|207 DisneyCity|tomajerrya007@dc.org|10000007|94022
+10008|Tom008|Jerrya|208 DisneyCity|tomajerrya008@dc.org|10000008|94022
+10009|Tom009|Jerrya|209 DisneyCity|tomajerrya009@dc.org|10000009|94022
+10010|Tom010|Jerrya|210 DisneyCity|tomajerrya010@dc.org|10000010|94022
+10011|Tom011|Jerrya|211 DisneyCity|tomajerrya011@dc.org|10000011|94022
+10012|Tom012|Jerrya|212 DisneyCity|tomajerrya012@dc.org|10000012|94022
+10013|Tom013|Jerrya|213 DisneyCity|tomajerrya013@dc.org|10000013|94022
+10014|Tom014|Jerrya|214 DisneyCity|tomajerrya014@dc.org|10000014|94022
+10015|Tom015|Jerrya|215 DisneyCity|tomajerrya015@dc.org|10000015|94022
+10016|Tom016|Jerrya|216 DisneyCity|tomajerrya016@dc.org|10000016|94022
+10017|Tom017|Jerrya|217 DisneyCity|tomajerrya017@dc.org|10000017|94022
+10018|Tom018|Jerrya|218 DisneyCity|tomajerrya018@dc.org|10000018|94022
+10019|Tom019|Jerrya|219 DisneyCity|tomajerrya019@dc.org|10000019|94022
+10020|Tom020|Jerrya|220 DisneyCity|tomajerrya020@dc.org|10000020|94022
+10021|Tom021|Jerrya|221 DisneyCity|tomajerrya021@dc.org|10000021|94022
+10022|Tom022|Jerrya|222 DisneyCity|tomajerrya022@dc.org|10000022|94022
+10023|Tom023|Jerrya|223 DisneyCity|tomajerrya023@dc.org|10000023|94022
+10024|Tom024|Jerrya|224 DisneyCity|tomajerrya024@dc.org|10000024|94022
+10025|Tom025|Jerrya|225 DisneyCity|tomajerrya025@dc.org|10000025|94022
+10026|Tom026|Jerrya|226 DisneyCity|tomajerrya026@dc.org|10000026|94022
+10027|Tom027|Jerrya|227 DisneyCity|tomajerrya027@dc.org|10000027|94022
+10028|Tom028|Jerrya|228 DisneyCity|tomajerrya028@dc.org|10000028|94022
+10029|Tom029|Jerrya|229 DisneyCity|tomajerrya029@dc.org|10000029|94022
+10030|Tom030|Jerrya|230 DisneyCity|tomajerrya030@dc.org|10000030|94022
+10031|Tom031|Jerrya|231 DisneyCity|tomajerrya031@dc.org|10000031|94022
+10032|Tom032|Jerrya|232 DisneyCity|tomajerrya032@dc.org|10000032|94022
+10033|Tom033|Jerrya|233 DisneyCity|tomajerrya033@dc.org|10000033|94022
+10034|Tom034|Jerrya|234 DisneyCity|tomajerrya034@dc.org|10000034|94022
+10035|Tom035|Jerrya|235 DisneyCity|tomajerrya035@dc.org|10000035|94022
+10036|Tom036|Jerrya|236 DisneyCity|tomajerrya036@dc.org|10000036|94022
+10037|Tom037|Jerrya|237 DisneyCity|tomajerrya037@dc.org|10000037|94022
+10038|Tom038|Jerrya|238 DisneyCity|tomajerrya038@dc.org|10000038|94022
+10039|Tom039|Jerrya|239 DisneyCity|tomajerrya039@dc.org|10000039|94022
+10040|Tom040|Jerrya|240 DisneyCity|tomajerrya040@dc.org|10000040|94022
+10041|Tom041|Jerrya|241 DisneyCity|tomajerrya041@dc.org|10000041|94022
+10042|Tom042|Jerrya|242 DisneyCity|tomajerrya042@dc.org|10000042|94022
+10043|Tom043|Jerrya|243 DisneyCity|tomajerrya043@dc.org|10000043|94022
+10044|Tom044|Jerrya|244 DisneyCity|tomajerrya044@dc.org|10000044|94022
+10045|Tom045|Jerrya|245 DisneyCity|tomajerrya045@dc.org|10000045|94022
+10046|Tom046|Jerrya|246 DisneyCity|tomajerrya046@dc.org|10000046|94022
+10047|Tom047|Jerrya|247 DisneyCity|tomajerrya047@dc.org|10000047|94022
+10048|Tom048|Jerrya|248 DisneyCity|tomajerrya048@dc.org|10000048|94022
+10049|Tom049|Jerrya|249 DisneyCity|tomajerrya049@dc.org|10000049|94022
+10050|Tom050|Jerrya|250 DisneyCity|tomajerrya050@dc.org|10000050|94022
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/users_info_target.avro
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/users_info_target.avro b/measure/measure-batch/src/test/resources/users_info_target.avro
new file mode 100644
index 0000000..104dd6c
Binary files /dev/null and b/measure/measure-batch/src/test/resources/users_info_target.avro differ

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/users_info_target.dat
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/users_info_target.dat b/measure/measure-batch/src/test/resources/users_info_target.dat
new file mode 100644
index 0000000..07a6b40
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/users_info_target.dat
@@ -0,0 +1,50 @@
+10001|Tom001|Jerrya|201 DisneyCity|tomajerrya001@dc.org|10000101|94022
+10002|Tom002|Jerrya|202 DisneyCity|tomajerrya002@dc.org|10000102|94022
+10003|Tom003|Jerrya|203 DisneyCity|tomajerrya003@dc.org|10000003|94022
+10004|Tom004|Jerrya|204 DisneyCity|tomajerrya004@dc.org|10000004|94022
+10005|Tom005|Jerrya|205 DisneyCity|tomajerrya005@dc.org|10000005|94022
+10006|Tom006|Jerrya|206 DisneyCity|tomajerrya006@dc.org|10000006|94022
+10007|Tom007|Jerrya|207 DisneyCity|tomajerrya007@dc.org|10000007|94022
+10008|Tom008|Jerrya|208 DisneyCity|tomajerrya008@dc.org|10000008|94022
+10009|Tom009|Jerrya|209 DisneyCity|tomajerrya009@dc.org|10000009|94022
+10010|Tom010|Jerrya|210 DisneyCity|tomajerrya010@dc.org|10000010|94022
+10011|Tom011|Jerrya|211 DisneyCity|tomajerrya011@dc.org|10000011|94022
+10012|Tom012|Jerrya|212 DisneyCity|tomajerrya012@dc.org|10000012|94022
+10013|Tom013|Jerrya|213 DisneyCity|tomajerrya013@dc.org|10000013|94022
+10014|Tom014|Jerrya|214 DisneyCity|tomajerrya014@dc.org|10000014|94022
+10015|Tom015|Jerrya|215 DisneyCity|tomajerrya015@dc.org|10000015|94022
+10016|Tom016|Jerrya|216 DisneyCity|tomajerrya016@dc.org|10000016|94022
+10017|Tom017|Jerrya|217 DisneyCity|tomajerrya017@dc.org|10000017|94022
+10018|Tom018|Jerrya|218 DisneyCity|tomajerrya018@dc.org|10000018|94022
+10019|Tom019|Jerrya|219 DisneyCity|tomajerrya019@dc.org|10000019|94022
+10020|Tom020|Jerrya|220 DisneyCity|tomajerrya020@dc.org|10000020|94022
+10021|Tom021|Jerrya|221 DisneyCity|tomajerrya021@dc.org|10000021|94022
+10022|Tom022|Jerrya|222 DisneyCity|tomajerrya022@dc.org|10000022|94022
+10023|Tom023|Jerrya|223 DisneyCity|tomajerrya023@dc.org|10000023|94022
+10024|Tom024|Jerrya|224 DisneyCity|tomajerrya024@dc.org|10000024|94022
+10025|Tom025|Jerrya|225 DisneyCity|tomajerrya025@dc.org|10000025|94022
+10026|Tom026|Jerrya|226 DisneyCity|tomajerrya026@dc.org|10000026|94022
+10027|Tom027|Jerrya|227 DisneyCity|tomajerrya027@dc.org|10000027|94022
+10028|Tom028|Jerrya|228 DisneyCity|tomajerrya028@dc.org|10000028|94022
+10029|Tom029|Jerrya|229 DisneyCity|tomajerrya029@dc.org|10000029|94022
+10030|Tom030|Jerrya|230 DisneyCity|tomajerrya030@dc.org|10000030|94022
+10031|Tom031|Jerrya|231 DisneyCity|tomajerrya031@dc.org|10000031|94022
+10032|Tom032|Jerrya|232 DisneyCity|tomajerrya032@dc.org|10000032|94022
+10033|Tom033|Jerrya|233 DisneyCity|tomajerrya033@dc.org|10000033|94022
+10034|Tom034|Jerrya|234 DisneyCity|tomajerrya034@dc.org|10000034|94022
+10035|Tom035|Jerrya|235 DisneyCity|tomajerrya035@dc.org|10000035|94022
+10036|Tom036|Jerrya|236 DisneyCity|tomajerrya036@dc.org|10000036|94022
+10037|Tom037|Jerrya|237 DisneyCity|tomajerrya037@dc.org|10000037|94022
+10038|Tom038|Jerrya|238 DisneyCity|tomajerrya038@dc.org|10000038|94022
+10039|Tom039|Jerrya|239 DisneyCity|tomajerrya039@dc.org|10000039|94022
+10040|Tom040|Jerrya|240 DisneyCity|tomajerrya040@dc.org|10000040|94022
+10041|Tom041|Jerrya|241 DisneyCity|tomajerrya041@dc.org|10000041|94022
+10042|Tom042|Jerrya|242 DisneyCity|tomajerrya042@dc.org|10000042|94022
+10043|Tom043|Jerrya|243 DisneyCity|tomajerrya043@dc.org|10000043|94022
+10044|Tom044|Jerrya|244 DisneyCity|tomajerrya044@dc.org|10000044|94022
+10045|Tom045|Jerrya|245 DisneyCity|tomajerrya045@dc.org|10000045|94022
+10046|Tom046|Jerrya|246 DisneyCity|tomajerrya046@dc.org|10000046|94022
+10047|Tom047|Jerrya|247 DisneyCity|tomajerrya047@dc.org|10000047|94022
+10048|Tom048|Jerrya|248 DisneyCity|tomajerrya048@dc.org|10000048|94022
+10049|Tom049|Jerrya|249 DisneyCity|tomajerrya049@dc.org|10000049|94022
+10050|Tom050|Jerrya|250 DisneyCity|tomajerrya050@dc.org|10000050|94022
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
new file mode 100644
index 0000000..7448d54
--- /dev/null
+++ b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
@@ -0,0 +1,223 @@
+package org.apache.griffin.measure.batch.algo
+
+import java.util.Date
+
+import org.apache.griffin.measure.batch.config.params._
+import org.apache.griffin.measure.batch.config.params.env._
+import org.apache.griffin.measure.batch.config.params.user._
+import org.apache.griffin.measure.batch.config.reader._
+import org.apache.griffin.measure.batch.config.validator._
+import org.apache.griffin.measure.batch.connector.{CacheDataUtil, DataConnector, DataConnectorFactory}
+import org.apache.griffin.measure.batch.log.Loggable
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.griffin.measure.batch.rule.{RuleAnalyzer, RuleFactory}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.util.{Failure, Success, Try}
+
+
+@RunWith(classOf[JUnitRunner])
+class BatchAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+
+  val envFile = "src/test/resources/env.json"
+//  val confFile = "src/test/resources/config.json"
+  val confFile = "{\"name\":\"accu1\",\"type\":\"accuracy\",\"source\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_src.avro\"}},\"target\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_target.avro\"}},\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code AND (15 OR true) WHEN true AND $source.user_id > 10020\"}}"
+  val envFsType = "local"
+  val userFsType = "raw"
+
+  val args = Array(envFile, confFile)
+
+  var sc: SparkContext = _
+  var sqlContext: SQLContext = _
+
+  var allParam: AllParam = _
+
+  before {
+    // read param files
+    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+      case Success(p) => p
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-2)
+      }
+    }
+    val userParam = readParamFile[UserParam](confFile, userFsType) match {
+      case Success(p) => p
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-2)
+      }
+    }
+    allParam = AllParam(envParam, userParam)
+
+    // validate param files
+    validateParams(allParam) match {
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-3)
+      }
+      case _ => {
+        info("params validation pass")
+      }
+    }
+
+    val metricName = userParam.name
+    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
+    sc = new SparkContext(conf)
+    sqlContext = new SQLContext(sc)
+  }
+
+  test("algorithm") {
+    Try {
+      val envParam = allParam.envParam
+      val userParam = allParam.userParam
+
+      // start time
+      val startTime = new Date().getTime()
+
+      // get spark application id
+      val applicationId = sc.applicationId
+
+      // rules
+      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+      val rule: StatementExpr = ruleFactory.generateRule()
+      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+
+      ruleAnalyzer.globalCacheExprs.foreach(println)
+      ruleAnalyzer.globalFinalCacheExprs.foreach(println)
+
+      // global cache data
+      val globalCachedData = CacheDataUtil.genCachedMap(None, ruleAnalyzer.globalCacheExprs, Map[String, Any]())
+      val globalFinalCachedData = CacheDataUtil.filterCachedMap(ruleAnalyzer.globalFinalCacheExprs, globalCachedData)
+
+      // data connector
+      val sourceDataConnector: DataConnector =
+        DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
+          ruleAnalyzer.sourceGroupbyExprs, ruleAnalyzer.sourceCacheExprs,
+          ruleAnalyzer.sourceFinalCacheExprs, globalFinalCachedData,
+          ruleAnalyzer.whenClauseExpr
+        ) match {
+          case Success(cntr) => {
+            if (cntr.available) cntr
+            else throw new Exception("source data not available!")
+          }
+          case Failure(ex) => throw ex
+        }
+      val targetDataConnector: DataConnector =
+        DataConnectorFactory.getDataConnector(sqlContext, userParam.targetParam,
+          ruleAnalyzer.targetGroupbyExprs, ruleAnalyzer.targetCacheExprs,
+          ruleAnalyzer.targetFinalCacheExprs, globalFinalCachedData,
+          ruleAnalyzer.whenClauseExpr
+        ) match {
+          case Success(cntr) => {
+            if (cntr.available) cntr
+            else throw new Exception("target data not available!")
+          }
+          case Failure(ex) => throw ex
+        }
+
+      // get metadata
+//      val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
+//        case Success(md) => md
+//        case Failure(ex) => throw ex
+//      }
+//      val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match {
+//        case Success(md) => md
+//        case Failure(ex) => throw ex
+//      }
+
+      // get data
+      val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
+        case Success(dt) => dt
+        case Failure(ex) => throw ex
+      }
+      val targetData: RDD[(Product, Map[String, Any])] = targetDataConnector.data() match {
+        case Success(dt) => dt
+        case Failure(ex) => throw ex
+      }
+
+//      println("-- global cache exprs --")
+//      ruleAnalyzer.globalCacheExprs.foreach(a => println(s"${a._id} ${a.desc}"))
+//      println("-- global cache data --")
+//      globalCachedData.foreach(println)
+//      println("-- global final cache data --")
+//      globalFinalCachedData.foreach(println)
+//
+//      println("-- source persist exprs --")
+//      ruleAnalyzer.sourcePersistExprs.foreach(a => println(s"${a._id} ${a.desc}"))
+//      println("-- source cache exprs --")
+//      ruleAnalyzer.sourceCacheExprs.foreach(a => println(s"${a._id} ${a.desc}"))
+//
+//      println("-- source --")
+//      sourceData.foreach { a =>
+//        val printMap = ruleAnalyzer.sourcePersistExprs.flatMap { expr =>
+//          a._2.get(expr._id) match {
+//            case Some(v) => Some((expr._id + expr.desc, v))
+//            case _ => None
+//          }
+//        }.toMap
+//        println(printMap)
+//
+//        val cacheMap = ruleAnalyzer.sourceCacheExprs.flatMap { expr =>
+//          a._2.get(expr._id) match {
+//            case Some(v) => Some((expr._id + expr.desc, v))
+//            case _ => None
+//          }
+//        }.toMap
+//        println(cacheMap)
+//
+//        println(a)
+//        println(a._2.size)
+//      }
+
+//      println("-- target --")
+//      targetData.foreach { a =>
+//        val printMap = ruleAnalyzer.targetPersistExprs.flatMap { expr =>
+//          a._2.get(expr._id) match {
+//            case Some(v) => Some((expr.desc, v))
+//            case _ => None
+//          }
+//        }.toMap
+//        println(printMap)
+//      }
+
+      // my algo
+      val algo = BatchAccuracyAlgo(allParam)
+
+      // accuracy algorithm
+      val (accuResult, missingRdd, matchingRdd) = algo.accuracy(sourceData, targetData, ruleAnalyzer)
+
+      println(s"match percentage: ${accuResult.matchPercentage}, total count: ${accuResult.total}")
+
+      missingRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourcePersistExprs, ruleAnalyzer.targetPersistExprs)).foreach(println)
+
+      // end time
+      val endTime = new Date().getTime
+      println(s"using time: ${endTime - startTime} ms")
+    } match {
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-4)
+      }
+      case _ => {
+        info("calculation finished")
+      }
+    }
+  }
+
+  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
+    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+    paramReader.readConfig[T]
+  }
+
+  private def validateParams(allParam: AllParam): Try[Boolean] = {
+    val allParamValidator = AllParamValidator()
+    allParamValidator.validate(allParam)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala
new file mode 100644
index 0000000..cf0c9b3
--- /dev/null
+++ b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala
@@ -0,0 +1,34 @@
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.griffin.measure.batch.config.params.env._
+import org.apache.griffin.measure.batch.config.params.user._
+import org.apache.griffin.measure.batch.log.Loggable
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.util.{Failure, Success}
+
+
+@RunWith(classOf[JUnitRunner])
+class ParamFileReaderTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+
+  test("test file reader") {
+    val userReader = ParamFileReader("src/test/resources/config1.json")
+    val envReader = ParamFileReader("src/test/resources/env1.json")
+
+    val p1 = userReader.readConfig[UserParam]
+    val p2 = envReader.readConfig[EnvParam]
+
+    p1 match {
+      case Success(v) => println(v)
+      case Failure(ex) => error(ex.getMessage)
+    }
+
+    p2 match {
+      case Success(v) => println(v)
+      case Failure(ex) => error(ex.getMessage)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
new file mode 100644
index 0000000..4715bcf
--- /dev/null
+++ b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
@@ -0,0 +1,74 @@
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.log.Loggable
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+
+@RunWith(classOf[JUnitRunner])
+class RuleParserTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+
+//  test("test rule parser") {
+//    val ruleParser = RuleParser()
+//
+////    val rules = "outTime = 24h; @Invalid ${source}['__time'] + ${outTime} > ${target}['__time']"
+//    val rules = "@Key ${source}.json()['seeds'][*].json()['metadata'].json()['tracker']['crawlRequestCreateTS'] === ${target}.json()['groups'][0][\"attrsList\"]['name'=\"CRAWLMETADATA\"]['values'][0].json()['tracker']['crawlRequestCreateTS']"
+////    val rules = "${source}['__time'] + ${outTime} > ${target}['__time']"
+////    val rules = "${source}['__time'] > ${target}['__time']"
+////    val rules = "432"
+////    val rules = "${target}.json()['groups'][0]['attrsList']['name'='URL']['values'][0]"
+//
+//    val result = ruleParser.parseAll(ruleParser.statementsExpr, rules)
+//
+//    println(result)
+//  }
+
+//  test("treat escape") {
+//    val es = """Hello\tworld\nmy name is \"ABC\""""
+//    val un = StringContext treatEscapes es
+//
+//    println(es)
+//    println(un)
+//  }
+
+  test("test rule parser") {
+    val ruleParser = RuleParser()
+
+//    val rules = "$SOUrce['tgt' < $source['tag' != 2] - -+-++---1] between ( -$target['32a'] + 9, 100, ----1000 ) and (45 > 9 or $target.type + 8 == 9 and $source['a'] >= 0) when not not not not $source._time + 24h < $target._time"
+    val rules = "$source['aaaf fd', (21, 43), '12']"
+
+    val result = ruleParser.parseAll(ruleParser.selection, rules)
+
+    println(result)
+  }
+
+  test("test rule analyzer") {
+    val ruleParser = RuleParser()
+
+//    val rules = "$source.tag == $target['take' >= 5] and $source.price + $source.price1 > $target['kk' < $target.age] and $source.ee = $target.fe + $target.a when $target.ggg = 1"
+    val rules = "$source.tag = $target.tag WHEN true"
+    val result = ruleParser.parseAll(ruleParser.rule, rules)
+    println(result)
+
+    if (result.successful) {
+      val ruleAnalyzer = RuleAnalyzer(result.get)
+
+      println("source")
+      ruleAnalyzer.sourceCacheExprs.foreach(a => println(a.desc))
+      println("source final")
+      ruleAnalyzer.sourceFinalCacheExprs.foreach(a => println(a.desc))
+      println("target")
+      ruleAnalyzer.targetCacheExprs.foreach(a => println(a.desc))
+      println("target final")
+      ruleAnalyzer.targetFinalCacheExprs.foreach(a => println(a.desc))
+      println("groupby")
+      ruleAnalyzer.sourceGroupbyExprs.foreach(println)
+      ruleAnalyzer.targetGroupbyExprs.foreach(println)
+    }
+
+
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/pom.xml
----------------------------------------------------------------------
diff --git a/measure/pom.xml b/measure/pom.xml
new file mode 100644
index 0000000..7ed7f22
--- /dev/null
+++ b/measure/pom.xml
@@ -0,0 +1,193 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.griffin</groupId>
+  <artifactId>measure</artifactId>
+  <version>0.1.0-SNAPSHOT</version>
+  <packaging>pom</packaging>
+
+  <modules>
+    <module>measure-batch</module>
+  </modules>
+
+  <name>Apache Griffin :: Measures</name>
+  <url>http://maven.apache.org</url>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+    <maven.compiler.source>1.8</maven.compiler.source>
+    <maven.compiler.target>1.8</maven.compiler.target>
+
+    <scala.version>2.10.6</scala.version>
+    <spark.version>1.6.0</spark.version>
+    <scala.binary.version>2.10</scala.binary.version>
+
+    <avro.version>1.7.7</avro.version>
+    <jackson.version>2.8.7</jackson.version>
+    <scalaj.version>2.3.0</scalaj.version>
+    <junit.version>4.11</junit.version>
+    <scalatest.version>2.2.4</scalatest.version>
+    <slf4j.version>1.7.21</slf4j.version>
+    <log4j.version>1.2.16</log4j.version>
+  </properties>
+
+  <dependencies>
+    <!--scala-->
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+
+    <!--spark, spark streaming, spark hive-->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+
+    <!--jackson-->
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.module</groupId>
+      <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <!--scalaj for http request-->
+    <dependency>
+      <groupId>org.scalaj</groupId>
+      <artifactId>scalaj-http_${scala.binary.version}</artifactId>
+      <version>${scalaj.version}</version>
+    </dependency>
+
+    <!--avro-->
+    <dependency>
+      <groupId>com.databricks</groupId>
+      <artifactId>spark-avro_${scala.binary.version}</artifactId>
+      <version>2.0.1</version>
+    </dependency>
+    <!--csv-->
+    <dependency>
+      <groupId>com.databricks</groupId>
+      <artifactId>spark-csv_${scala.binary.version}</artifactId>
+      <version>1.5.0</version>
+    </dependency>
+
+    <!--log4j-->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+    <!--<dependency>-->
+    <!--<groupId>org.slf4j</groupId>-->
+    <!--<artifactId>slf4j-simple</artifactId>-->
+    <!--<version>${slf4j.version}</version>-->
+    <!--</dependency>-->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>${log4j.version}</version>
+    </dependency>
+
+    <!--junit-->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!--scala test-->
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <version>${scalatest.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <!-- or whatever version you use -->
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.5.1</version>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+      <!--<plugin>-->
+        <!--<artifactId>maven-assembly-plugin</artifactId>-->
+        <!--<configuration>-->
+          <!--<descriptorRefs>-->
+            <!--<descriptorRef>jar-with-dependencies</descriptorRef>-->
+          <!--</descriptorRefs>-->
+        <!--</configuration>-->
+        <!--<executions>-->
+          <!--<execution>-->
+            <!--<id>make-assembly</id>-->
+            <!--<phase>package</phase>-->
+            <!--<goals>-->
+              <!--<goal>single</goal>-->
+            <!--</goals>-->
+          <!--</execution>-->
+        <!--</executions>-->
+      <!--</plugin>-->
+    </plugins>
+  </build>
+</project>
\ No newline at end of file