You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/05/04 03:04:12 UTC
[04/51] [partial] incubator-griffin git commit: refactor arch
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala
new file mode 100644
index 0000000..fec8c49
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala
@@ -0,0 +1,265 @@
+package org.apache.griffin.measure.batch.utils
+
+import scala.util.{Success, Try}
+
+
+object CalculationUtil {
+
+ implicit def option2CalculationValue(v: Option[_]): CalculationValue = CalculationValue(v)
+
+ case class CalculationValue(value: Option[_]) extends Serializable {
+
+ def + (other: Option[_]): Option[_] = {
+ Try {
+ (value, other) match {
+ case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString)
+ case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte)
+ case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort)
+ case (Some(v1: Int), Some(v2)) => Some(v1 + v2.toString.toInt)
+ case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong)
+ case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat)
+ case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble)
+ case (None, Some(v2)) => other
+ case _ => value
+ }
+ } match {
+ case Success(opt) => opt
+ case _ => value
+ }
+ }
+
+ def - (other: Option[_]): Option[_] = {
+ Try {
+ (value, other) match {
+ case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte)
+ case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort)
+ case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt)
+ case (Some(v1: Long), Some(v2)) => Some(v1 - v2.toString.toLong)
+ case (Some(v1: Float), Some(v2)) => Some(v1 - v2.toString.toFloat)
+ case (Some(v1: Double), Some(v2)) => Some(v1 - v2.toString.toDouble)
+ case _ => value
+ }
+ } match {
+ case Success(opt) => opt
+ case _ => value
+ }
+ }
+
+ def * (other: Option[_]): Option[_] = {
+ Try {
+ (value, other) match {
+ case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2)
+ case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt)
+ case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte)
+ case (Some(v1: Short), Some(v2)) => Some(v1 * v2.toString.toShort)
+ case (Some(v1: Int), Some(v2)) => Some(v1 * v2.toString.toInt)
+ case (Some(v1: Long), Some(v2)) => Some(v1 * v2.toString.toLong)
+ case (Some(v1: Float), Some(v2)) => Some(v1 * v2.toString.toFloat)
+ case (Some(v1: Double), Some(v2)) => Some(v1 * v2.toString.toDouble)
+ case _ => value
+ }
+ } match {
+ case Success(opt) => opt
+ case _ => value
+ }
+ }
+
+ def / (other: Option[_]): Option[_] = {
+ Try {
+ (value, other) match {
+ case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte)
+ case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort)
+ case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt)
+ case (Some(v1: Long), Some(v2)) => Some(v1 / v2.toString.toLong)
+ case (Some(v1: Float), Some(v2)) => Some(v1 / v2.toString.toFloat)
+ case (Some(v1: Double), Some(v2)) => Some(v1 / v2.toString.toDouble)
+ case _ => value
+ }
+ } match {
+ case Success(opt) => opt
+ case _ => value
+ }
+ }
+
+ def % (other: Option[_]): Option[_] = {
+ Try {
+ (value, other) match {
+ case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte)
+ case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort)
+ case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt)
+ case (Some(v1: Long), Some(v2)) => Some(v1 % v2.toString.toLong)
+ case (Some(v1: Float), Some(v2)) => Some(v1 % v2.toString.toFloat)
+ case (Some(v1: Double), Some(v2)) => Some(v1 % v2.toString.toDouble)
+ case _ => value
+ }
+ } match {
+ case Success(opt) => opt
+ case _ => value
+ }
+ }
+
+ def unary_- (): Option[_] = {
+ value match {
+ case Some(v: String) => Some(v.reverse.toString)
+ case Some(v: Boolean) => Some(!v)
+ case Some(v: Byte) => Some(-v)
+ case Some(v: Short) => Some(-v)
+ case Some(v: Int) => Some(-v)
+ case Some(v: Long) => Some(-v)
+ case Some(v: Float) => Some(-v)
+ case Some(v: Double) => Some(-v)
+ case Some(v) => Some(v)
+ case _ => None
+ }
+ }
+
+
+ def === (other: Option[_]): Option[Boolean] = {
+ (value, other) match {
+ case (Some(v1), Some(v2)) => Some(v1 == v2)
+ case _ => None
+ }
+ }
+
+ def =!= (other: Option[_]): Option[Boolean] = {
+ (value, other) match {
+ case (Some(v1), Some(v2)) => Some(v1 != v2)
+ case _ => None
+ }
+ }
+
+ def > (other: Option[_]): Option[Boolean] = {
+ Try {
+ (value, other) match {
+ case (Some(v1: String), Some(v2: String)) => Some(v1 > v2)
+ case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble)
+ case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble)
+ case (Some(v1: Int), Some(v2)) => Some(v1 > v2.toString.toDouble)
+ case (Some(v1: Long), Some(v2)) => Some(v1 > v2.toString.toDouble)
+ case (Some(v1: Float), Some(v2)) => Some(v1 > v2.toString.toDouble)
+ case (Some(v1: Double), Some(v2)) => Some(v1 > v2.toString.toDouble)
+ case _ => None
+ }
+ } match {
+ case Success(opt) => opt
+ case _ => None
+ }
+ }
+
+ def >= (other: Option[_]): Option[Boolean] = {
+ Try {
+ (value, other) match {
+ case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2)
+ case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+ case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+ case (Some(v1: Int), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+ case (Some(v1: Long), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+ case (Some(v1: Float), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+ case (Some(v1: Double), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+ case _ => None
+ }
+ } match {
+ case Success(opt) => opt
+ case _ => None
+ }
+ }
+
+ def < (other: Option[_]): Option[Boolean] = {
+ Try {
+ (value, other) match {
+ case (Some(v1: String), Some(v2: String)) => Some(v1 < v2)
+ case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble)
+ case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble)
+ case (Some(v1: Int), Some(v2)) => Some(v1 < v2.toString.toDouble)
+ case (Some(v1: Long), Some(v2)) => Some(v1 < v2.toString.toDouble)
+ case (Some(v1: Float), Some(v2)) => Some(v1 < v2.toString.toDouble)
+ case (Some(v1: Double), Some(v2)) => Some(v1 < v2.toString.toDouble)
+ case _ => None
+ }
+ } match {
+ case Success(opt) => opt
+ case _ => None
+ }
+ }
+
+ def <= (other: Option[_]): Option[Boolean] = {
+ Try {
+ (value, other) match {
+ case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2)
+ case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+ case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+ case (Some(v1: Int), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+ case (Some(v1: Long), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+ case (Some(v1: Float), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+ case (Some(v1: Double), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+ case _ => None
+ }
+ } match {
+ case Success(opt) => opt
+ case _ => None
+ }
+ }
+
+
+ def in (other: Iterable[Option[_]]): Option[Boolean] = {
+ other.foldLeft(Some(false): Option[Boolean]) { (res, next) =>
+ optOr(res, ===(next))
+ }
+ }
+
+ def not_in (other: Iterable[Option[_]]): Option[Boolean] = {
+ other.foldLeft(Some(true): Option[Boolean]) { (res, next) =>
+ optAnd(res, =!=(next))
+ }
+ }
+
+ def between (other: Iterable[Option[_]]): Option[Boolean] = {
+ if (other.size < 2) None else {
+ val (begin, end) = (other.head, other.tail.head)
+ optAnd(>=(begin), <=(end))
+ }
+ }
+
+ def not_between (other: Iterable[Option[_]]): Option[Boolean] = {
+ if (other.size < 2) None else {
+ val (begin, end) = (other.head, other.tail.head)
+ optOr(<(begin), >(end))
+ }
+ }
+
+ def unary_! (): Option[Boolean] = {
+ optNot(value)
+ }
+
+ def && (other: Option[_]): Option[Boolean] = {
+ optAnd(value, other)
+ }
+
+ def || (other: Option[_]): Option[Boolean] = {
+ optOr(value, other)
+ }
+
+
+ private def optNot(a: Option[_]): Option[Boolean] = {
+ a match {
+ case Some(v: Boolean) => Some(!v)
+ case _ => None
+ }
+ }
+ private def optAnd(a: Option[_], b: Option[_]): Option[Boolean] = {
+ (a, b) match {
+ case (Some(false), _) | (_, Some(false)) => Some(false)
+ case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2)
+ case _ => None
+ }
+ }
+ private def optOr(a: Option[_], b: Option[_]): Option[Boolean] = {
+ (a, b) match {
+ case (Some(true), _) | (_, Some(true)) => Some(true)
+ case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2)
+ case _ => None
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
new file mode 100644
index 0000000..b48478a
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
@@ -0,0 +1,62 @@
+package org.apache.griffin.measure.batch.utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
+
+object HdfsUtil {
+
+ private val seprator = "/"
+
+ private val conf = new Configuration()
+ conf.set("dfs.support.append", "true")
+
+ private val dfs = FileSystem.get(conf)
+
+ def existPath(filePath: String): Boolean = {
+ val path = new Path(filePath)
+ dfs.exists(path)
+ }
+
+ def createFile(filePath: String): FSDataOutputStream = {
+ val path = new Path(filePath)
+ if (dfs.exists(path)) dfs.delete(path, true)
+ return dfs.create(path)
+ }
+
+ def appendOrCreateFile(filePath: String): FSDataOutputStream = {
+ val path = new Path(filePath)
+ if (dfs.exists(path)) dfs.append(path) else createFile(filePath)
+ }
+
+ def openFile(filePath: String): FSDataInputStream = {
+ val path = new Path(filePath)
+ dfs.open(path)
+ }
+
+ def writeContent(filePath: String, message: String): Unit = {
+ val out = createFile(filePath)
+ out.write(message.getBytes("utf-8"))
+ out.close
+ }
+
+ def appendContent(filePath: String, message: String): Unit = {
+ val out = appendOrCreateFile(filePath)
+ out.write(message.getBytes("utf-8"))
+ out.close
+ }
+
+ def createEmptyFile(filePath: String): Unit = {
+ val out = createFile(filePath)
+ out.close
+ }
+
+
+ def getHdfsFilePath(parentPath: String, fileName: String): String = {
+ if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + seprator + fileName
+ }
+
+ def deleteHdfsPath(dirPath: String): Unit = {
+ val path = new Path(dirPath)
+ if (dfs.exists(path)) dfs.delete(path, true)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
new file mode 100644
index 0000000..747d0fa
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
@@ -0,0 +1,30 @@
+package org.apache.griffin.measure.batch.utils
+
+import scalaj.http._
+
+object HttpUtil {
+
+ val GET_REGEX = """^(?i)get$""".r
+ val POST_REGEX = """^(?i)post$""".r
+ val PUT_REGEX = """^(?i)put$""".r
+ val DELETE_REGEX = """^(?i)delete$""".r
+
+ def postData(url: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = {
+ val response = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString
+ response.code.toString
+ }
+
+ def httpRequest(url: String, method: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = {
+ val httpReq = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers))
+ method match {
+ case POST_REGEX() => httpReq.postData(data).asString.code.toString
+ case PUT_REGEX() => httpReq.put(data).asString.code.toString
+ case _ => "wrong method"
+ }
+ }
+
+ private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, String] = {
+ map.map(pair => pair._1 -> pair._2.toString)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
new file mode 100644
index 0000000..cdd470a
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
@@ -0,0 +1,32 @@
+package org.apache.griffin.measure.batch.utils
+
+import java.io.InputStream
+
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import scala.reflect._
+
+object JsonUtil {
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+
+ def toJson(value: Map[Symbol, Any]): String = {
+ toJson(value map { case (k,v) => k.name -> v})
+ }
+
+ def toJson(value: Any): String = {
+ mapper.writeValueAsString(value)
+ }
+
+ def toMap[V](json:String)(implicit m: Manifest[V]) = fromJson[Map[String,V]](json)
+
+ def fromJson[T: ClassTag](json: String)(implicit m : Manifest[T]): T = {
+ mapper.readValue[T](json, classTag[T].runtimeClass.asInstanceOf[Class[T]])
+ }
+
+ def fromJson[T: ClassTag](is: InputStream)(implicit m : Manifest[T]): T = {
+ mapper.readValue[T](is, classTag[T].runtimeClass.asInstanceOf[Class[T]])
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala
new file mode 100644
index 0000000..7f2b355
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala
@@ -0,0 +1,10 @@
+package org.apache.griffin.measure.batch.utils
+
+object StringParseUtil {
+
+ def sepStrings(str: String, sep: String): Iterable[String] = {
+ val strings = str.split(sep)
+ strings.map(_.trim)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/config.json b/measure/measure-batch/src/test/resources/config.json
new file mode 100644
index 0000000..65e0ed9
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/config.json
@@ -0,0 +1,25 @@
+{
+ "name": "accu1",
+ "type": "accuracy",
+
+ "source": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
+ }
+ },
+
+ "target": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_target.avro"
+ }
+ },
+
+ "evaluateRule": {
+ "sampleRatio": 1,
+ "rules": "$source.user_id > 10020 AND $source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/config1.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/config1.json b/measure/measure-batch/src/test/resources/config1.json
new file mode 100644
index 0000000..d7290ba
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/config1.json
@@ -0,0 +1,27 @@
+{
+ "name": "accu-test",
+ "type": "accuracy",
+
+ "source": {
+ "type": "hive",
+ "version": "1.2",
+ "config": {
+ "table.name": "rheos_view_event",
+ "partitions": "dt=20170410, hour=15"
+ }
+ },
+
+ "target": {
+ "type": "hive",
+ "version": "1.2",
+ "config": {
+ "table.name": "be_view_event_queue",
+ "partitions": "dt=20170410, hour=15; dt=20170410, hour=16"
+ }
+ },
+
+ "evaluateRule": {
+ "sampleRatio": 1,
+ "rules": "@Key ${source}['uid'] === ${target}['uid']; @Key ${source}['eventtimestamp'] === ${target}['eventtimestamp']; ${source}['page_id'] === ${target}['page_id']; ${source}['site_id'] === ${target}['site_id']; ${source}['itm'] === ${target}['itm']"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/env.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/env.json b/measure/measure-batch/src/test/resources/env.json
new file mode 100644
index 0000000..3a9e38c
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/env.json
@@ -0,0 +1,27 @@
+{
+ "spark": {
+ "log.level": "ERROR",
+ "checkpoint.dir": "hdfs:///griffin/batch/cp",
+ "config": {}
+ },
+
+ "persist": [
+ {
+ "type": "hdfs",
+ "config": {
+ "path": "hdfs:///griffin/streaming/persist"
+ }
+ },
+ {
+ "type": "http",
+ "config": {
+ "method": "post",
+ "api": "http://phxbark4dq-360935.stratus.phx.ebay.com:8080/"
+ }
+ }
+ ],
+
+ "cleaner": {
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/env1.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/env1.json b/measure/measure-batch/src/test/resources/env1.json
new file mode 100644
index 0000000..a059715
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/env1.json
@@ -0,0 +1,21 @@
+{
+ "spark": {
+ "log.level": "INFO",
+ "checkpoint.dir": "hdfs:///griffin/batch/cp",
+ "config": {}
+ },
+
+ "persist": [
+ {
+ "type": "hdfs",
+ "config": {
+ "path": "hdfs:///user/b_des/bark/griffin-batch/test",
+ "max.lines.per.file": 10000
+ }
+ }
+ ],
+
+ "cleaner": {
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/log4j.properties b/measure/measure-batch/src/test/resources/log4j.properties
new file mode 100644
index 0000000..bd31e15
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/log4j.properties
@@ -0,0 +1,5 @@
+log4j.rootLogger=INFO, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/users_info_src.avro
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/users_info_src.avro b/measure/measure-batch/src/test/resources/users_info_src.avro
new file mode 100644
index 0000000..3d5c939
Binary files /dev/null and b/measure/measure-batch/src/test/resources/users_info_src.avro differ
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/users_info_src.dat
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/users_info_src.dat b/measure/measure-batch/src/test/resources/users_info_src.dat
new file mode 100644
index 0000000..ce49443
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/users_info_src.dat
@@ -0,0 +1,50 @@
+10001|Tom001|Jerrya|201 DisneyCity|tomajerrya001@dc.org|10000001|94022
+10002|Tom002|Jerrya|202 DisneyCity|tomajerrya002@dc.org|10000002|94022
+10003|Tom003|Jerrya|203 DisneyCity|tomajerrya003@dc.org|10000003|94022
+10004|Tom004|Jerrya|204 DisneyCity|tomajerrya004@dc.org|10000004|94022
+10005|Tom005|Jerrya|205 DisneyCity|tomajerrya005@dc.org|10000005|94022
+10006|Tom006|Jerrya|206 DisneyCity|tomajerrya006@dc.org|10000006|94022
+10007|Tom007|Jerrya|207 DisneyCity|tomajerrya007@dc.org|10000007|94022
+10008|Tom008|Jerrya|208 DisneyCity|tomajerrya008@dc.org|10000008|94022
+10009|Tom009|Jerrya|209 DisneyCity|tomajerrya009@dc.org|10000009|94022
+10010|Tom010|Jerrya|210 DisneyCity|tomajerrya010@dc.org|10000010|94022
+10011|Tom011|Jerrya|211 DisneyCity|tomajerrya011@dc.org|10000011|94022
+10012|Tom012|Jerrya|212 DisneyCity|tomajerrya012@dc.org|10000012|94022
+10013|Tom013|Jerrya|213 DisneyCity|tomajerrya013@dc.org|10000013|94022
+10014|Tom014|Jerrya|214 DisneyCity|tomajerrya014@dc.org|10000014|94022
+10015|Tom015|Jerrya|215 DisneyCity|tomajerrya015@dc.org|10000015|94022
+10016|Tom016|Jerrya|216 DisneyCity|tomajerrya016@dc.org|10000016|94022
+10017|Tom017|Jerrya|217 DisneyCity|tomajerrya017@dc.org|10000017|94022
+10018|Tom018|Jerrya|218 DisneyCity|tomajerrya018@dc.org|10000018|94022
+10019|Tom019|Jerrya|219 DisneyCity|tomajerrya019@dc.org|10000019|94022
+10020|Tom020|Jerrya|220 DisneyCity|tomajerrya020@dc.org|10000020|94022
+10021|Tom021|Jerrya|221 DisneyCity|tomajerrya021@dc.org|10000021|94022
+10022|Tom022|Jerrya|222 DisneyCity|tomajerrya022@dc.org|10000022|94022
+10023|Tom023|Jerrya|223 DisneyCity|tomajerrya023@dc.org|10000023|94022
+10024|Tom024|Jerrya|224 DisneyCity|tomajerrya024@dc.org|10000024|94022
+10025|Tom025|Jerrya|225 DisneyCity|tomajerrya025@dc.org|10000025|94022
+10026|Tom026|Jerrya|226 DisneyCity|tomajerrya026@dc.org|10000026|94022
+10027|Tom027|Jerrya|227 DisneyCity|tomajerrya027@dc.org|10000027|94022
+10028|Tom028|Jerrya|228 DisneyCity|tomajerrya028@dc.org|10000028|94022
+10029|Tom029|Jerrya|229 DisneyCity|tomajerrya029@dc.org|10000029|94022
+10030|Tom030|Jerrya|230 DisneyCity|tomajerrya030@dc.org|10000030|94022
+10031|Tom031|Jerrya|231 DisneyCity|tomajerrya031@dc.org|10000031|94022
+10032|Tom032|Jerrya|232 DisneyCity|tomajerrya032@dc.org|10000032|94022
+10033|Tom033|Jerrya|233 DisneyCity|tomajerrya033@dc.org|10000033|94022
+10034|Tom034|Jerrya|234 DisneyCity|tomajerrya034@dc.org|10000034|94022
+10035|Tom035|Jerrya|235 DisneyCity|tomajerrya035@dc.org|10000035|94022
+10036|Tom036|Jerrya|236 DisneyCity|tomajerrya036@dc.org|10000036|94022
+10037|Tom037|Jerrya|237 DisneyCity|tomajerrya037@dc.org|10000037|94022
+10038|Tom038|Jerrya|238 DisneyCity|tomajerrya038@dc.org|10000038|94022
+10039|Tom039|Jerrya|239 DisneyCity|tomajerrya039@dc.org|10000039|94022
+10040|Tom040|Jerrya|240 DisneyCity|tomajerrya040@dc.org|10000040|94022
+10041|Tom041|Jerrya|241 DisneyCity|tomajerrya041@dc.org|10000041|94022
+10042|Tom042|Jerrya|242 DisneyCity|tomajerrya042@dc.org|10000042|94022
+10043|Tom043|Jerrya|243 DisneyCity|tomajerrya043@dc.org|10000043|94022
+10044|Tom044|Jerrya|244 DisneyCity|tomajerrya044@dc.org|10000044|94022
+10045|Tom045|Jerrya|245 DisneyCity|tomajerrya045@dc.org|10000045|94022
+10046|Tom046|Jerrya|246 DisneyCity|tomajerrya046@dc.org|10000046|94022
+10047|Tom047|Jerrya|247 DisneyCity|tomajerrya047@dc.org|10000047|94022
+10048|Tom048|Jerrya|248 DisneyCity|tomajerrya048@dc.org|10000048|94022
+10049|Tom049|Jerrya|249 DisneyCity|tomajerrya049@dc.org|10000049|94022
+10050|Tom050|Jerrya|250 DisneyCity|tomajerrya050@dc.org|10000050|94022
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/users_info_target.avro
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/users_info_target.avro b/measure/measure-batch/src/test/resources/users_info_target.avro
new file mode 100644
index 0000000..104dd6c
Binary files /dev/null and b/measure/measure-batch/src/test/resources/users_info_target.avro differ
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/resources/users_info_target.dat
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/users_info_target.dat b/measure/measure-batch/src/test/resources/users_info_target.dat
new file mode 100644
index 0000000..07a6b40
--- /dev/null
+++ b/measure/measure-batch/src/test/resources/users_info_target.dat
@@ -0,0 +1,50 @@
+10001|Tom001|Jerrya|201 DisneyCity|tomajerrya001@dc.org|10000101|94022
+10002|Tom002|Jerrya|202 DisneyCity|tomajerrya002@dc.org|10000102|94022
+10003|Tom003|Jerrya|203 DisneyCity|tomajerrya003@dc.org|10000003|94022
+10004|Tom004|Jerrya|204 DisneyCity|tomajerrya004@dc.org|10000004|94022
+10005|Tom005|Jerrya|205 DisneyCity|tomajerrya005@dc.org|10000005|94022
+10006|Tom006|Jerrya|206 DisneyCity|tomajerrya006@dc.org|10000006|94022
+10007|Tom007|Jerrya|207 DisneyCity|tomajerrya007@dc.org|10000007|94022
+10008|Tom008|Jerrya|208 DisneyCity|tomajerrya008@dc.org|10000008|94022
+10009|Tom009|Jerrya|209 DisneyCity|tomajerrya009@dc.org|10000009|94022
+10010|Tom010|Jerrya|210 DisneyCity|tomajerrya010@dc.org|10000010|94022
+10011|Tom011|Jerrya|211 DisneyCity|tomajerrya011@dc.org|10000011|94022
+10012|Tom012|Jerrya|212 DisneyCity|tomajerrya012@dc.org|10000012|94022
+10013|Tom013|Jerrya|213 DisneyCity|tomajerrya013@dc.org|10000013|94022
+10014|Tom014|Jerrya|214 DisneyCity|tomajerrya014@dc.org|10000014|94022
+10015|Tom015|Jerrya|215 DisneyCity|tomajerrya015@dc.org|10000015|94022
+10016|Tom016|Jerrya|216 DisneyCity|tomajerrya016@dc.org|10000016|94022
+10017|Tom017|Jerrya|217 DisneyCity|tomajerrya017@dc.org|10000017|94022
+10018|Tom018|Jerrya|218 DisneyCity|tomajerrya018@dc.org|10000018|94022
+10019|Tom019|Jerrya|219 DisneyCity|tomajerrya019@dc.org|10000019|94022
+10020|Tom020|Jerrya|220 DisneyCity|tomajerrya020@dc.org|10000020|94022
+10021|Tom021|Jerrya|221 DisneyCity|tomajerrya021@dc.org|10000021|94022
+10022|Tom022|Jerrya|222 DisneyCity|tomajerrya022@dc.org|10000022|94022
+10023|Tom023|Jerrya|223 DisneyCity|tomajerrya023@dc.org|10000023|94022
+10024|Tom024|Jerrya|224 DisneyCity|tomajerrya024@dc.org|10000024|94022
+10025|Tom025|Jerrya|225 DisneyCity|tomajerrya025@dc.org|10000025|94022
+10026|Tom026|Jerrya|226 DisneyCity|tomajerrya026@dc.org|10000026|94022
+10027|Tom027|Jerrya|227 DisneyCity|tomajerrya027@dc.org|10000027|94022
+10028|Tom028|Jerrya|228 DisneyCity|tomajerrya028@dc.org|10000028|94022
+10029|Tom029|Jerrya|229 DisneyCity|tomajerrya029@dc.org|10000029|94022
+10030|Tom030|Jerrya|230 DisneyCity|tomajerrya030@dc.org|10000030|94022
+10031|Tom031|Jerrya|231 DisneyCity|tomajerrya031@dc.org|10000031|94022
+10032|Tom032|Jerrya|232 DisneyCity|tomajerrya032@dc.org|10000032|94022
+10033|Tom033|Jerrya|233 DisneyCity|tomajerrya033@dc.org|10000033|94022
+10034|Tom034|Jerrya|234 DisneyCity|tomajerrya034@dc.org|10000034|94022
+10035|Tom035|Jerrya|235 DisneyCity|tomajerrya035@dc.org|10000035|94022
+10036|Tom036|Jerrya|236 DisneyCity|tomajerrya036@dc.org|10000036|94022
+10037|Tom037|Jerrya|237 DisneyCity|tomajerrya037@dc.org|10000037|94022
+10038|Tom038|Jerrya|238 DisneyCity|tomajerrya038@dc.org|10000038|94022
+10039|Tom039|Jerrya|239 DisneyCity|tomajerrya039@dc.org|10000039|94022
+10040|Tom040|Jerrya|240 DisneyCity|tomajerrya040@dc.org|10000040|94022
+10041|Tom041|Jerrya|241 DisneyCity|tomajerrya041@dc.org|10000041|94022
+10042|Tom042|Jerrya|242 DisneyCity|tomajerrya042@dc.org|10000042|94022
+10043|Tom043|Jerrya|243 DisneyCity|tomajerrya043@dc.org|10000043|94022
+10044|Tom044|Jerrya|244 DisneyCity|tomajerrya044@dc.org|10000044|94022
+10045|Tom045|Jerrya|245 DisneyCity|tomajerrya045@dc.org|10000045|94022
+10046|Tom046|Jerrya|246 DisneyCity|tomajerrya046@dc.org|10000046|94022
+10047|Tom047|Jerrya|247 DisneyCity|tomajerrya047@dc.org|10000047|94022
+10048|Tom048|Jerrya|248 DisneyCity|tomajerrya048@dc.org|10000048|94022
+10049|Tom049|Jerrya|249 DisneyCity|tomajerrya049@dc.org|10000049|94022
+10050|Tom050|Jerrya|250 DisneyCity|tomajerrya050@dc.org|10000050|94022
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
new file mode 100644
index 0000000..7448d54
--- /dev/null
+++ b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
@@ -0,0 +1,223 @@
+package org.apache.griffin.measure.batch.algo
+
+import java.util.Date
+
+import org.apache.griffin.measure.batch.config.params._
+import org.apache.griffin.measure.batch.config.params.env._
+import org.apache.griffin.measure.batch.config.params.user._
+import org.apache.griffin.measure.batch.config.reader._
+import org.apache.griffin.measure.batch.config.validator._
+import org.apache.griffin.measure.batch.connector.{CacheDataUtil, DataConnector, DataConnectorFactory}
+import org.apache.griffin.measure.batch.log.Loggable
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.griffin.measure.batch.rule.{RuleAnalyzer, RuleFactory}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.util.{Failure, Success, Try}
+
+
+@RunWith(classOf[JUnitRunner])
+class BatchAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+
+ val envFile = "src/test/resources/env.json"
+// val confFile = "src/test/resources/config.json"
+ val confFile = "{\"name\":\"accu1\",\"type\":\"accuracy\",\"source\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_src.avro\"}},\"target\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_target.avro\"}},\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code AND (15 OR true) WHEN true AND $source.user_id > 10020\"}}"
+ val envFsType = "local"
+ val userFsType = "raw"
+
+ val args = Array(envFile, confFile)
+
+ var sc: SparkContext = _
+ var sqlContext: SQLContext = _
+
+ var allParam: AllParam = _
+
+ before {
+ // read param files
+ val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+ case Success(p) => p
+ case Failure(ex) => {
+ error(ex.getMessage)
+ sys.exit(-2)
+ }
+ }
+ val userParam = readParamFile[UserParam](confFile, userFsType) match {
+ case Success(p) => p
+ case Failure(ex) => {
+ error(ex.getMessage)
+ sys.exit(-2)
+ }
+ }
+ allParam = AllParam(envParam, userParam)
+
+ // validate param files
+ validateParams(allParam) match {
+ case Failure(ex) => {
+ error(ex.getMessage)
+ sys.exit(-3)
+ }
+ case _ => {
+ info("params validation pass")
+ }
+ }
+
+ val metricName = userParam.name
+ val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
+ sc = new SparkContext(conf)
+ sqlContext = new SQLContext(sc)
+ }
+
+ test("algorithm") {
+ Try {
+ val envParam = allParam.envParam
+ val userParam = allParam.userParam
+
+ // start time
+ val startTime = new Date().getTime()
+
+ // get spark application id
+ val applicationId = sc.applicationId
+
+ // rules
+ val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+ val rule: StatementExpr = ruleFactory.generateRule()
+ val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+
+ ruleAnalyzer.globalCacheExprs.foreach(println)
+ ruleAnalyzer.globalFinalCacheExprs.foreach(println)
+
+ // global cache data
+ val globalCachedData = CacheDataUtil.genCachedMap(None, ruleAnalyzer.globalCacheExprs, Map[String, Any]())
+ val globalFinalCachedData = CacheDataUtil.filterCachedMap(ruleAnalyzer.globalFinalCacheExprs, globalCachedData)
+
+ // data connector
+ val sourceDataConnector: DataConnector =
+ DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
+ ruleAnalyzer.sourceGroupbyExprs, ruleAnalyzer.sourceCacheExprs,
+ ruleAnalyzer.sourceFinalCacheExprs, globalFinalCachedData,
+ ruleAnalyzer.whenClauseExpr
+ ) match {
+ case Success(cntr) => {
+ if (cntr.available) cntr
+ else throw new Exception("source data not available!")
+ }
+ case Failure(ex) => throw ex
+ }
+ val targetDataConnector: DataConnector =
+ DataConnectorFactory.getDataConnector(sqlContext, userParam.targetParam,
+ ruleAnalyzer.targetGroupbyExprs, ruleAnalyzer.targetCacheExprs,
+ ruleAnalyzer.targetFinalCacheExprs, globalFinalCachedData,
+ ruleAnalyzer.whenClauseExpr
+ ) match {
+ case Success(cntr) => {
+ if (cntr.available) cntr
+ else throw new Exception("target data not available!")
+ }
+ case Failure(ex) => throw ex
+ }
+
+ // get metadata
+// val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
+// case Success(md) => md
+// case Failure(ex) => throw ex
+// }
+// val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match {
+// case Success(md) => md
+// case Failure(ex) => throw ex
+// }
+
+ // get data
+ val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
+ case Success(dt) => dt
+ case Failure(ex) => throw ex
+ }
+ val targetData: RDD[(Product, Map[String, Any])] = targetDataConnector.data() match {
+ case Success(dt) => dt
+ case Failure(ex) => throw ex
+ }
+
+// println("-- global cache exprs --")
+// ruleAnalyzer.globalCacheExprs.foreach(a => println(s"${a._id} ${a.desc}"))
+// println("-- global cache data --")
+// globalCachedData.foreach(println)
+// println("-- global final cache data --")
+// globalFinalCachedData.foreach(println)
+//
+// println("-- source persist exprs --")
+// ruleAnalyzer.sourcePersistExprs.foreach(a => println(s"${a._id} ${a.desc}"))
+// println("-- source cache exprs --")
+// ruleAnalyzer.sourceCacheExprs.foreach(a => println(s"${a._id} ${a.desc}"))
+//
+// println("-- source --")
+// sourceData.foreach { a =>
+// val printMap = ruleAnalyzer.sourcePersistExprs.flatMap { expr =>
+// a._2.get(expr._id) match {
+// case Some(v) => Some((expr._id + expr.desc, v))
+// case _ => None
+// }
+// }.toMap
+// println(printMap)
+//
+// val cacheMap = ruleAnalyzer.sourceCacheExprs.flatMap { expr =>
+// a._2.get(expr._id) match {
+// case Some(v) => Some((expr._id + expr.desc, v))
+// case _ => None
+// }
+// }.toMap
+// println(cacheMap)
+//
+// println(a)
+// println(a._2.size)
+// }
+
+// println("-- target --")
+// targetData.foreach { a =>
+// val printMap = ruleAnalyzer.targetPersistExprs.flatMap { expr =>
+// a._2.get(expr._id) match {
+// case Some(v) => Some((expr.desc, v))
+// case _ => None
+// }
+// }.toMap
+// println(printMap)
+// }
+
+ // my algo
+ val algo = BatchAccuracyAlgo(allParam)
+
+ // accuracy algorithm
+ val (accuResult, missingRdd, matchingRdd) = algo.accuracy(sourceData, targetData, ruleAnalyzer)
+
+ println(s"match percentage: ${accuResult.matchPercentage}, total count: ${accuResult.total}")
+
+ missingRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourcePersistExprs, ruleAnalyzer.targetPersistExprs)).foreach(println)
+
+ // end time
+ val endTime = new Date().getTime
+ println(s"using time: ${endTime - startTime} ms")
+ } match {
+ case Failure(ex) => {
+ error(ex.getMessage)
+ sys.exit(-4)
+ }
+ case _ => {
+ info("calculation finished")
+ }
+ }
+ }
+
+ private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
+ val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+ paramReader.readConfig[T]
+ }
+
+ private def validateParams(allParam: AllParam): Try[Boolean] = {
+ val allParamValidator = AllParamValidator()
+ allParamValidator.validate(allParam)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala
new file mode 100644
index 0000000..cf0c9b3
--- /dev/null
+++ b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala
@@ -0,0 +1,34 @@
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.griffin.measure.batch.config.params.env._
+import org.apache.griffin.measure.batch.config.params.user._
+import org.apache.griffin.measure.batch.log.Loggable
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.util.{Failure, Success}
+
+
+@RunWith(classOf[JUnitRunner])
+class ParamFileReaderTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+
+ test("test file reader") {
+ val userReader = ParamFileReader("src/test/resources/config1.json")
+ val envReader = ParamFileReader("src/test/resources/env1.json")
+
+ val p1 = userReader.readConfig[UserParam]
+ val p2 = envReader.readConfig[EnvParam]
+
+ p1 match {
+ case Success(v) => println(v)
+ case Failure(ex) => error(ex.getMessage)
+ }
+
+ p2 match {
+ case Success(v) => println(v)
+ case Failure(ex) => error(ex.getMessage)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
new file mode 100644
index 0000000..4715bcf
--- /dev/null
+++ b/measure/measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
@@ -0,0 +1,74 @@
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.log.Loggable
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+
+@RunWith(classOf[JUnitRunner])
+class RuleParserTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+
+// test("test rule parser") {
+// val ruleParser = RuleParser()
+//
+//// val rules = "outTime = 24h; @Invalid ${source}['__time'] + ${outTime} > ${target}['__time']"
+// val rules = "@Key ${source}.json()['seeds'][*].json()['metadata'].json()['tracker']['crawlRequestCreateTS'] === ${target}.json()['groups'][0][\"attrsList\"]['name'=\"CRAWLMETADATA\"]['values'][0].json()['tracker']['crawlRequestCreateTS']"
+//// val rules = "${source}['__time'] + ${outTime} > ${target}['__time']"
+//// val rules = "${source}['__time'] > ${target}['__time']"
+//// val rules = "432"
+//// val rules = "${target}.json()['groups'][0]['attrsList']['name'='URL']['values'][0]"
+//
+// val result = ruleParser.parseAll(ruleParser.statementsExpr, rules)
+//
+// println(result)
+// }
+
+// test("treat escape") {
+// val es = """Hello\tworld\nmy name is \"ABC\""""
+// val un = StringContext treatEscapes es
+//
+// println(es)
+// println(un)
+// }
+
+ test("test rule parser") {
+ val ruleParser = RuleParser()
+
+// val rules = "$SOUrce['tgt' < $source['tag' != 2] - -+-++---1] between ( -$target['32a'] + 9, 100, ----1000 ) and (45 > 9 or $target.type + 8 == 9 and $source['a'] >= 0) when not not not not $source._time + 24h < $target._time"
+ val rules = "$source['aaaf fd', (21, 43), '12']"
+
+ val result = ruleParser.parseAll(ruleParser.selection, rules)
+
+ println(result)
+ }
+
+ test("test rule analyzer") {
+ val ruleParser = RuleParser()
+
+// val rules = "$source.tag == $target['take' >= 5] and $source.price + $source.price1 > $target['kk' < $target.age] and $source.ee = $target.fe + $target.a when $target.ggg = 1"
+ val rules = "$source.tag = $target.tag WHEN true"
+ val result = ruleParser.parseAll(ruleParser.rule, rules)
+ println(result)
+
+ if (result.successful) {
+ val ruleAnalyzer = RuleAnalyzer(result.get)
+
+ println("source")
+ ruleAnalyzer.sourceCacheExprs.foreach(a => println(a.desc))
+ println("source final")
+ ruleAnalyzer.sourceFinalCacheExprs.foreach(a => println(a.desc))
+ println("target")
+ ruleAnalyzer.targetCacheExprs.foreach(a => println(a.desc))
+ println("target final")
+ ruleAnalyzer.targetFinalCacheExprs.foreach(a => println(a.desc))
+ println("groupby")
+ ruleAnalyzer.sourceGroupbyExprs.foreach(println)
+ ruleAnalyzer.targetGroupbyExprs.foreach(println)
+ }
+
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/pom.xml
----------------------------------------------------------------------
diff --git a/measure/pom.xml b/measure/pom.xml
new file mode 100644
index 0000000..7ed7f22
--- /dev/null
+++ b/measure/pom.xml
@@ -0,0 +1,193 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.griffin</groupId>
+ <artifactId>measure</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>measure-batch</module>
+ </modules>
+
+ <name>Apache Griffin :: Measures</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+
+ <scala.version>2.10.6</scala.version>
+ <spark.version>1.6.0</spark.version>
+ <scala.binary.version>2.10</scala.binary.version>
+
+ <avro.version>1.7.7</avro.version>
+ <jackson.version>2.8.7</jackson.version>
+ <scalaj.version>2.3.0</scalaj.version>
+ <junit.version>4.11</junit.version>
+ <scalatest.version>2.2.4</scalatest.version>
+ <slf4j.version>1.7.21</slf4j.version>
+ <log4j.version>1.2.16</log4j.version>
+ </properties>
+
+ <dependencies>
+ <!--scala-->
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <!--spark, spark streaming, spark hive-->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <!--jackson-->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <!--scalaj for http request-->
+ <dependency>
+ <groupId>org.scalaj</groupId>
+ <artifactId>scalaj-http_${scala.binary.version}</artifactId>
+ <version>${scalaj.version}</version>
+ </dependency>
+
+ <!--avro-->
+ <dependency>
+ <groupId>com.databricks</groupId>
+ <artifactId>spark-avro_${scala.binary.version}</artifactId>
+ <version>2.0.1</version>
+ </dependency>
+ <!--csv-->
+ <dependency>
+ <groupId>com.databricks</groupId>
+ <artifactId>spark-csv_${scala.binary.version}</artifactId>
+ <version>1.5.0</version>
+ </dependency>
+
+ <!--log4j-->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <!--<dependency>-->
+ <!--<groupId>org.slf4j</groupId>-->
+ <!--<artifactId>slf4j-simple</artifactId>-->
+ <!--<version>${slf4j.version}</version>-->
+ <!--</dependency>-->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <!--junit-->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!--scala test-->
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <version>${scalatest.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <!-- or whatever version you use -->
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <!--<plugin>-->
+ <!--<artifactId>maven-assembly-plugin</artifactId>-->
+ <!--<configuration>-->
+ <!--<descriptorRefs>-->
+ <!--<descriptorRef>jar-with-dependencies</descriptorRef>-->
+ <!--</descriptorRefs>-->
+ <!--</configuration>-->
+ <!--<executions>-->
+ <!--<execution>-->
+ <!--<id>make-assembly</id>-->
+ <!--<phase>package</phase>-->
+ <!--<goals>-->
+ <!--<goal>single</goal>-->
+ <!--</goals>-->
+ <!--</execution>-->
+ <!--</executions>-->
+ <!--</plugin>-->
+ </plugins>
+ </build>
+</project>
\ No newline at end of file