You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/05/04 03:04:13 UTC
[05/51] [partial] incubator-griffin git commit: refactor arch
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
new file mode 100644
index 0000000..b0ab8b5
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
@@ -0,0 +1,16 @@
+package org.apache.griffin.measure.batch.config.validator
+
+import org.apache.griffin.measure.batch.config.params.Param
+
+import scala.util.Try
+
+case class AllParamValidator() extends ParamValidator {
+
+ def validate[T <: Param](param: Param): Try[Boolean] = {
+ Try {
+ // fixme: not done, need to validate param
+ true
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
new file mode 100644
index 0000000..9dc9e60
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
@@ -0,0 +1,12 @@
+package org.apache.griffin.measure.batch.config.validator
+
+import org.apache.griffin.measure.batch.log.Loggable
+import org.apache.griffin.measure.batch.config.params.Param
+
+import scala.util.Try
+
+trait ParamValidator extends Loggable with Serializable {
+
+ def validate[T <: Param](param: Param): Try[Boolean]
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
new file mode 100644
index 0000000..4de22fb
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
@@ -0,0 +1,91 @@
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import com.databricks.spark.avro._
+
+import scala.util.{Success, Try}
+import java.nio.file.{Files, Paths}
+
+import org.apache.griffin.measure.batch.utils.HdfsUtil
+
+case class AvroDataConnector(sqlContext: SQLContext, config: Map[String, Any],
+ groupbyExprs: Seq[MathExpr], cacheExprs: Iterable[Expr],
+ finalCacheExprs: Iterable[Expr], globalFinalCacheMap: Map[String, Any],
+ whenClauseOpt: Option[LogicalExpr]
+ ) extends DataConnector {
+
+ val FilePath = "file.path"
+ val FileName = "file.name"
+
+ val filePath = config.getOrElse(FilePath, "").toString
+ val fileName = config.getOrElse(FileName, "").toString
+
+ val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
+
+ private def pathPrefix(): Boolean = {
+ filePath.nonEmpty
+ }
+
+ private def fileExist(): Boolean = {
+ HdfsUtil.existPath(concreteFileFullPath)
+ }
+
+ def available(): Boolean = {
+ (!concreteFileFullPath.isEmpty) && fileExist
+ }
+
+ def metaData(): Try[Iterable[(String, String)]] = {
+ Try {
+ val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+ st.fields.map(f => (f.name, f.dataType.typeName))
+ }
+ }
+
+ def data(): Try[RDD[(Product, Map[String, Any])]] = {
+ Try {
+ loadDataFile.flatMap { row =>
+ // generate cache data
+ val cacheData: Map[String, Any] = cacheExprs.foldLeft(globalFinalCacheMap) { (cachedMap, expr) =>
+ CacheDataUtil.genCachedMap(Some(row), expr, cachedMap)
+ }
+ val finalCacheData = CacheDataUtil.filterCachedMap(finalCacheExprs, cacheData)
+
+ // when clause filter data source
+ val whenResult = whenClauseOpt match {
+ case Some(whenClause) => whenClause.calculate(finalCacheData)
+ case _ => None
+ }
+
+ // get groupby data
+ whenResult match {
+ case Some(false) => None
+ case _ => {
+ val groupbyData: Seq[AnyRef] = groupbyExprs.flatMap { expr =>
+ expr.calculate(finalCacheData) match {
+ case Some(v) => Some(v.asInstanceOf[AnyRef])
+ case _ => None
+ }
+ }
+ val key = toTuple(groupbyData)
+
+ Some((key, finalCacheData))
+ }
+ }
+ }
+ }
+ }
+
+ private def loadDataFile() = {
+ sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
+ }
+
+ private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+ if (as.size > 0) {
+ val tupleClass = Class.forName("scala.Tuple" + as.size)
+ tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
+ } else None
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala
new file mode 100644
index 0000000..884df5d
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala
@@ -0,0 +1,63 @@
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.sql.Row
+
+import scala.util.{Success, Try}
+
+object CacheDataUtil {
+
+ // for now, one expr only get one value, not supporting one expr get multiple values
+ private def getCacheData(data: Option[Any], expr: Expr, cachedMap: Map[String, Any]): Option[Any] = {
+ Try {
+ expr match {
+ case selection: SelectionExpr => {
+ selection.selectors.foldLeft(data) { (dt, selector) =>
+ getCacheData(dt, selector, cachedMap)
+ }
+ }
+ case selector: IndexFieldRangeSelectExpr => {
+ data match {
+ case Some(row: Row) => {
+ if (selector.fields.size == 1) {
+ selector.fields.head match {
+ case i: IndexDesc => Some(row.getAs[Any](i.index))
+ case f: FieldDesc => Some(row.getAs[Any](f.field))
+ case _ => None
+ }
+ } else None
+ }
+ case _ => None
+ }
+ }
+ case _ => expr.calculate(cachedMap)
+ }
+ } match {
+ case Success(v) => v
+ case _ => None
+ }
+ }
+
+ def genCachedMap(data: Option[Any], expr: Expr, initialCachedMap: Map[String, Any]): Map[String, Any] = {
+ val valueOpt = getCacheData(data, expr, initialCachedMap)
+ if (valueOpt.nonEmpty) {
+ initialCachedMap + (expr._id -> valueOpt.get)
+ } else initialCachedMap
+ }
+
+ def genCachedMap(data: Option[Any], exprs: Iterable[Expr], initialCachedMap: Map[String, Any]): Map[String, Any] = {
+ exprs.foldLeft(initialCachedMap) { (cachedMap, expr) =>
+ CacheDataUtil.genCachedMap(None, expr, cachedMap)
+ }
+ }
+
+ def filterCachedMap(exprs: Iterable[Expr], cachedMap: Map[String, Any]): Map[String, Any] = {
+ exprs.foldLeft(Map[String, Any]()) { (newMap, expr) =>
+ val valueOpt = expr.calculate(cachedMap)
+ if (valueOpt.nonEmpty) {
+ newMap + (expr._id -> valueOpt.get)
+ } else newMap
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
new file mode 100644
index 0000000..9cc9be6
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
@@ -0,0 +1,16 @@
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+
+trait DataConnector extends Serializable {
+
+ def available(): Boolean
+
+ def metaData(): Try[Iterable[(String, String)]]
+
+ def data(): Try[RDD[(Product, Map[String, Any])]]
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
new file mode 100644
index 0000000..8f636c1
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
@@ -0,0 +1,35 @@
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.griffin.measure.batch.config.params.user._
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.sql.SQLContext
+
+import scala.util.Try
+
+object DataConnectorFactory {
+
+ val HiveRegex = """^(?i)hive$""".r
+ val AvroRegex = """^(?i)avro$""".r
+
+ def getDataConnector(sqlContext: SQLContext,
+ dataConnectorParam: DataConnectorParam,
+ groupbyExprs: Seq[MathExpr],
+ cacheExprs: Iterable[Expr],
+ finalCacheExprs: Iterable[Expr],
+ globalFinalCacheMap: Map[String, Any],
+ whenClauseOpt: Option[LogicalExpr]
+ ): Try[DataConnector] = {
+ val conType = dataConnectorParam.conType
+ val version = dataConnectorParam.version
+ Try {
+ conType match {
+ case HiveRegex() => HiveDataConnector(sqlContext, dataConnectorParam.config,
+ groupbyExprs, cacheExprs, finalCacheExprs, globalFinalCacheMap, whenClauseOpt)
+ case AvroRegex() => AvroDataConnector(sqlContext, dataConnectorParam.config,
+ groupbyExprs, cacheExprs, finalCacheExprs, globalFinalCacheMap, whenClauseOpt)
+ case _ => throw new Exception("connector creation error!")
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
new file mode 100644
index 0000000..a0fd414
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
@@ -0,0 +1,113 @@
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+
+import scala.util.{Success, Try}
+
+case class HiveDataConnector(sqlContext: SQLContext, config: Map[String, Any],
+ groupbyExprs: Seq[MathExpr], cacheExprs: Iterable[Expr],
+ finalCacheExprs: Iterable[Expr], globalFinalCacheMap: Map[String, Any],
+ whenClauseOpt: Option[LogicalExpr]
+ ) extends DataConnector {
+
+ val Database = "database"
+ val TableName = "table.name"
+ val Partitions = "partitions"
+
+ val database = config.getOrElse(Database, "").toString
+ val tableName = config.getOrElse(TableName, "").toString
+ val partitionsString = config.getOrElse(Partitions, "").toString
+
+ val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName
+ val partitions = partitionsString.split(";").map(s => s.split(",").map(_.trim))
+
+ private def dbPrefix(): Boolean = {
+ database.nonEmpty && !database.equals("default")
+ }
+
+ def available(): Boolean = {
+ (!tableName.isEmpty) && {
+ Try {
+ if (dbPrefix) {
+ sqlContext.tables(database).filter(tableExistsSql).collect.size
+ } else {
+ sqlContext.tables().filter(tableExistsSql).collect.size
+ }
+ } match {
+ case Success(s) => s > 0
+ case _ => false
+ }
+ }
+ }
+
+ def metaData(): Try[Iterable[(String, String)]] = {
+ Try {
+ val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), r.getString(1))).collect
+ val partitionPos: Int = originRows.indexWhere(pair => pair._1.startsWith("# "))
+ if (partitionPos < 0) originRows
+ else originRows.take(partitionPos)
+ }
+ }
+
+ def data(): Try[RDD[(Product, Map[String, Any])]] = {
+ Try {
+ sqlContext.sql(dataSql).flatMap { row =>
+ // generate cache data
+ val cacheData: Map[String, Any] = cacheExprs.foldLeft(globalFinalCacheMap) { (cachedMap, expr) =>
+ CacheDataUtil.genCachedMap(Some(row), expr, cachedMap)
+ }
+ val finalCacheData = CacheDataUtil.filterCachedMap(finalCacheExprs, cacheData)
+
+ // when clause filter data source
+ val whenResult = whenClauseOpt match {
+ case Some(whenClause) => whenClause.calculate(finalCacheData)
+ case _ => None
+ }
+
+ // get groupby data
+ whenResult match {
+ case Some(false) => None
+ case _ => {
+ val groupbyData: Seq[AnyRef] = groupbyExprs.flatMap { expr =>
+ expr.calculate(finalCacheData) match {
+ case Some(v) => Some(v.asInstanceOf[AnyRef])
+ case _ => None
+ }
+ }
+ val key = toTuple(groupbyData)
+
+ Some((key, finalCacheData))
+ }
+ }
+ }
+ }
+ }
+
+ private def tableExistsSql(): String = {
+// s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql
+ s"tableName LIKE '${tableName}'"
+ }
+
+ private def metaDataSql(): String = {
+ s"DESCRIBE ${concreteTableName}"
+ }
+
+ private def dataSql(): String = {
+ val clauses = partitions.map { prtn =>
+ val cls = prtn.mkString(" AND ")
+ if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}"
+ else s"SELECT * FROM ${concreteTableName} WHERE ${cls}"
+ }
+ clauses.mkString(" UNION ALL ")
+ }
+
+ private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+ if (as.size > 0) {
+ val tupleClass = Class.forName("scala.Tuple" + as.size)
+ tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
+ } else None
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
new file mode 100644
index 0000000..f73e86c
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
@@ -0,0 +1,25 @@
+package org.apache.griffin.measure.batch.log
+
+import org.slf4j.LoggerFactory
+
+trait Loggable {
+
+ @transient private lazy val logger = LoggerFactory.getLogger(getClass)
+
+ protected def info(msg: String): Unit = {
+ logger.info(msg)
+ }
+
+ protected def debug(msg: String): Unit = {
+ logger.debug(msg)
+ }
+
+ protected def warn(msg: String): Unit = {
+ logger.warn(msg)
+ }
+
+ protected def error(msg: String): Unit = {
+ logger.error(msg)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
new file mode 100644
index 0000000..7bff3b6
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
@@ -0,0 +1,119 @@
+package org.apache.griffin.measure.batch.persist
+
+import java.util.Date
+
+import org.apache.griffin.measure.batch.result._
+import org.apache.griffin.measure.batch.utils.HdfsUtil
+import org.apache.spark.rdd.RDD
+
+
+case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist {
+
+ val Path = "path"
+ val MaxPersistLines = "max.persist.lines"
+ val MaxLinesPerFile = "max.lines.per.file"
+
+ val path = config.getOrElse(Path, "").toString
+ val maxPersistLines = config.getOrElse(MaxPersistLines, -1).toString.toLong
+ val maxLinesPerFile = config.getOrElse(MaxLinesPerFile, 10000).toString.toLong
+
+ val separator = "/"
+
+ val StartFile = filePath("_START")
+ val FinishFile = filePath("_FINISH")
+ val ResultFile = filePath("_RESULT")
+
+ val MissRecFile = filePath("_MISSREC") // optional
+
+ val LogFile = filePath("_LOG")
+
+ var _init = true
+ private def isInit = {
+ val i = _init
+ _init = false
+ i
+ }
+
+ def available(): Boolean = {
+ (path.nonEmpty) && (maxPersistLines < Int.MaxValue)
+ }
+
+ private def persistHead: String = {
+ val dt = new Date(timeStamp)
+ s"================ log of ${dt} ================\n"
+ }
+
+ private def timeHead(rt: Long): String = {
+ val dt = new Date(rt)
+ s"--- ${dt} ---\n"
+ }
+
+ protected def getFilePath(parentPath: String, fileName: String): String = {
+ if (parentPath.endsWith(separator)) parentPath + fileName else parentPath + separator + fileName
+ }
+
+ protected def filePath(file: String): String = {
+ getFilePath(path, s"${metricName}/${timeStamp}/${file}")
+ }
+
+ protected def withSuffix(path: String, suffix: String): String = {
+ s"${path}.${suffix}"
+ }
+
+ def start(msg: String): Unit = {
+ HdfsUtil.writeContent(StartFile, msg)
+ }
+ def finish(): Unit = {
+ HdfsUtil.createEmptyFile(FinishFile)
+ }
+
+ def result(rt: Long, result: Result): Unit = {
+ val resStr = result match {
+ case ar: AccuracyResult => {
+ s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
+ }
+ case _ => {
+ s"result: ${result}"
+ }
+ }
+ HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr)
+ log(rt, resStr)
+
+ info(resStr)
+ }
+
+ // need to avoid string too long
+ def missRecords(records: RDD[String]): Unit = {
+ val recordCount = records.count
+ val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
+ if (count > 0) {
+ val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
+ if (groupCount <= 1) {
+ val recs = records.take(count.toInt)
+ persistRecords(MissRecFile, recs)
+ } else {
+ val groupedRecords: RDD[(Long, Iterable[String])] =
+ records.zipWithIndex.flatMap { r =>
+ val gid = r._2 / maxLinesPerFile
+ if (gid < groupCount) Some((gid, r._1)) else None
+ }.groupByKey()
+ groupedRecords.foreach { group =>
+ val (gid, recs) = group
+ val hdfsPath = if (gid == 0) MissRecFile else withSuffix(MissRecFile, gid.toString)
+ persistRecords(hdfsPath, recs)
+ }
+ }
+ }
+ }
+
+ private def persistRecords(hdfsPath: String, records: Iterable[String]): Unit = {
+ val recStr = records.mkString("\n")
+ HdfsUtil.appendContent(hdfsPath, recStr)
+ }
+
+ def log(rt: Long, msg: String): Unit = {
+ val logStr = (if (isInit) persistHead else "") + timeHead(rt) + s"${msg}\n\n"
+ HdfsUtil.appendContent(LogFile, logStr)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
new file mode 100644
index 0000000..5765927
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
@@ -0,0 +1,44 @@
+package org.apache.griffin.measure.batch.persist
+
+import org.apache.griffin.measure.batch.result._
+import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil}
+import org.apache.spark.rdd.RDD
+
+case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist {
+
+ val Api = "api"
+ val Method = "method"
+
+ val api = config.getOrElse(Api, "").toString
+ val method = config.getOrElse(Method, "post").toString
+
+ def available(): Boolean = {
+ api.nonEmpty
+ }
+
+ def start(msg: String): Unit = {}
+ def finish(): Unit = {}
+
+ def result(rt: Long, result: Result): Unit = {
+ result match {
+ case ar: AccuracyResult => {
+ val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> ar.matchPercentage))
+ val data = JsonUtil.toJson(dataMap)
+
+ // post
+ val params = Map[String, Object]()
+ val header = Map[String, Object](("content-type" -> "application/json"))
+ val status = HttpUtil.httpRequest(api, method, params, header, data)
+ info(s"${method} to ${api} response status: ${status}")
+ }
+ case _ => {
+ info(s"result: ${result}")
+ }
+ }
+ }
+
+ def missRecords(records: RDD[String]): Unit = {}
+
+ def log(rt: Long, msg: String): Unit = {}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
new file mode 100644
index 0000000..2fa6942
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
@@ -0,0 +1,27 @@
+package org.apache.griffin.measure.batch.persist
+
+import org.apache.griffin.measure.batch.result._
+import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil}
+import org.apache.spark.rdd.RDD
+
+case class MultiPersists(persists: Iterable[Persist]) extends Persist {
+
+ val timeStamp: Long = persists match {
+ case Nil => 0
+ case _ => persists.head.timeStamp
+ }
+
+ val config: Map[String, Any] = Map[String, Any]()
+
+ def available(): Boolean = { persists.exists(_.available()) }
+
+ def start(msg: String): Unit = { persists.foreach(_.start(msg)) }
+ def finish(): Unit = {persists.foreach(_.finish())}
+
+ def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, result)) }
+
+ def missRecords(records: RDD[String]): Unit = { persists.foreach(_.missRecords(records)) }
+
+ def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
new file mode 100644
index 0000000..7398c24
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
@@ -0,0 +1,23 @@
+package org.apache.griffin.measure.batch.persist
+
+import org.apache.griffin.measure.batch.log.Loggable
+import org.apache.griffin.measure.batch.result._
+import org.apache.spark.rdd.RDD
+
+
+trait Persist extends Loggable with Serializable {
+ val timeStamp: Long
+
+ val config: Map[String, Any]
+
+ def available(): Boolean
+
+ def start(msg: String): Unit
+ def finish(): Unit
+
+ def result(rt: Long, result: Result): Unit
+
+ def missRecords(records: RDD[String]): Unit
+
+ def log(rt: Long, msg: String): Unit
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
new file mode 100644
index 0000000..84dc4ce
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
@@ -0,0 +1,30 @@
+package org.apache.griffin.measure.batch.persist
+
+import org.apache.griffin.measure.batch.config.params.env._
+
+import scala.util.{Success, Try}
+
+
+case class PersistFactory(persistParams: Iterable[PersistParam], metricName: String) extends Serializable {
+
+ val HDFS_REGEX = """^(?i)hdfs$""".r
+ val HTTP_REGEX = """^(?i)http$""".r
+
+ def getPersists(timeStamp: Long): MultiPersists = {
+ MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param)))
+ }
+
+ private def getPersist(timeStamp: Long, persistParam: PersistParam): Option[Persist] = {
+ val persistConfig = persistParam.config
+ val persistTry = persistParam.persistType match {
+ case HDFS_REGEX() => Try(HdfsPersist(persistConfig, metricName, timeStamp))
+ case HTTP_REGEX() => Try(HttpPersist(persistConfig, metricName, timeStamp))
+ case _ => throw new Exception("not supported persist type")
+ }
+ persistTry match {
+ case Success(persist) if (persist.available) => Some(persist)
+ case _ => None
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
new file mode 100644
index 0000000..55505ac
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
@@ -0,0 +1,26 @@
+package org.apache.griffin.measure.batch.result
+
+
+case class AccuracyResult(miss: Long, total: Long) extends Result {
+
+ type T = AccuracyResult
+
+ def update(delta: T): T = {
+ AccuracyResult(delta.miss, total)
+ }
+
+ def eventual(): Boolean = {
+ this.miss <= 0
+ }
+
+ def differsFrom(other: T): Boolean = {
+ (this.miss != other.miss) || (this.total != other.total)
+ }
+
+ def getMiss = miss
+ def getTotal = total
+ def getMatch = total - miss
+
+ def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
new file mode 100644
index 0000000..8d529db
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
@@ -0,0 +1,14 @@
+package org.apache.griffin.measure.batch.result
+
+
+trait Result extends Serializable {
+
+ type T <: Result
+
+ def update(delta: T): T
+
+ def eventual(): Boolean
+
+ def differsFrom(other: T): Boolean
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
new file mode 100644
index 0000000..62df447
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
@@ -0,0 +1,39 @@
+package org.apache.griffin.measure.batch.result
+
+
+sealed trait ResultInfo {
+ type T
+ val key: String
+ val tp: String
+ def wrap(value: T) = (key -> value)
+}
+
+final case object TimeGroupInfo extends ResultInfo {
+ type T = Long
+ val key = "__time__"
+ val tp = "bigint"
+}
+
+final case object NextFireTimeInfo extends ResultInfo {
+ type T = Long
+ val key = "__next_fire_time__"
+ val tp = "bigint"
+}
+
+final case object MismatchInfo extends ResultInfo {
+ type T = String
+ val key = "__mismatch__"
+ val tp = "string"
+}
+
+final case object TargetInfo extends ResultInfo {
+ type T = Map[String, Any]
+ val key = "__target__"
+ val tp = "map"
+}
+
+final case object ErrorInfo extends ResultInfo {
+ type T = String
+ val key = "__error__"
+ val tp = "string"
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
new file mode 100644
index 0000000..78665f9
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
@@ -0,0 +1,28 @@
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.rule.expr._
+
+case class RuleAnalyzer(rule: StatementExpr) extends Serializable {
+
+ val GlobalData = ""
+ val SourceData = "source"
+ val TargetData = "target"
+
+ val globalCacheExprs: Iterable[Expr] = rule.getCacheExprs(GlobalData)
+ val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData)
+ val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData)
+
+ val sourcePersistExprs: Iterable[Expr] = rule.getPersistExprs(SourceData)
+ val targetPersistExprs: Iterable[Expr] = rule.getPersistExprs(TargetData)
+
+ val globalFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(GlobalData).toSet
+ val sourceFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(SourceData).toSet ++ sourcePersistExprs.toSet
+ val targetFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(TargetData).toSet ++ targetPersistExprs.toSet
+
+ val groupbyExprPairs: Seq[(MathExpr, MathExpr)] = rule.getGroupbyExprPairs((SourceData, TargetData))
+ val sourceGroupbyExprs: Seq[MathExpr] = groupbyExprPairs.map(_._1)
+ val targetGroupbyExprs: Seq[MathExpr] = groupbyExprPairs.map(_._2)
+
+ val whenClauseExpr: Option[LogicalExpr] = rule.getWhenClauseExpr
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
new file mode 100644
index 0000000..cc3e8b3
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
@@ -0,0 +1,34 @@
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.config.params.user._
+
+import scala.util.Failure
+//import org.apache.griffin.measure.batch.rule.expr_old._
+import org.apache.griffin.measure.batch.rule.expr._
+
+import scala.util.{Success, Try}
+
+
+case class RuleFactory(evaluateRuleParam: EvaluateRuleParam) {
+
+ val ruleParser: RuleParser = RuleParser()
+
+ def generateRule(): StatementExpr = {
+ val rules = evaluateRuleParam.rules
+ val statement = parseExpr(rules) match {
+ case Success(se) => se
+ case Failure(ex) => throw ex
+ }
+ statement
+ }
+
+ private def parseExpr(rules: String): Try[StatementExpr] = {
+ Try {
+ val result = ruleParser.parseAll(ruleParser.rule, rules)
+ if (result.successful) result.get
+ else throw new Exception("parse rule error!")
+// throw new Exception("parse rule error!")
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
new file mode 100644
index 0000000..996e808
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
@@ -0,0 +1,298 @@
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.rule.expr._
+
+import scala.util.parsing.combinator._
+
+case class RuleParser() extends JavaTokenParsers with Serializable {
+
+ /**
+ * BNF representation for grammar as below:
+ *
+ * <rule> ::= <logical-statement> [WHEN <logical-statement>]
+ * rule: mapping-rule [WHEN when-rule]
+ * - mapping-rule: the first level opr should better not be OR | NOT, otherwise it can't automatically find the groupby column
+ * - when-rule: only contain the general info of data source, not the special info of each data row
+ *
+ * <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")"
+ * logical-statement: return boolean value
+ * logical-operator: "AND" | "&&", "OR" | "||", "NOT" | "!"
+ *
+ * <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>)
+ * logical-expression example: $source.id = $target.id, $source.page_id IN ('3214', '4312', '60821')
+ *
+ * <compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
+ * <range-opr> ::= ["NOT"] "IN" | "BETWEEN"
+ * <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
+ * range-expr example: ('3214', '4312', '60821'), (10, 15), ()
+ *
+ * <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
+ * math-expr example: $source.price * $target.count, "hello" + " " + "world" + 123
+ *
+ * <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
+ * <unary-opr> ::= "+" | "-"
+ *
+ * <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
+ *
+ * <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+
+ * selection example: $source.price, $source.json(), $source['state'], $source.numList[3], $target.json().mails['org' = 'apache'].names[*]
+ *
+ * <selection-head> ::= $source | $target
+ *
+ * <field-sel> ::= "." <field-string>
+ *
+ * <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
+ * <function-name> ::= <name-string>
+ * <arg> ::= <math-expr>
+ *
+ * <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]"
+ * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*"
+ * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * means all items, 'age' means item 'age'
+ * <index-field> ::= <index> | <field-quote> | <all-selection>
+ * index: 0 ~ n means position from start, -1 ~ -n means position from end
+ * <field-quote> ::= ' <field-string> ' | " <field-string> "
+ *
+ * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
+ * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
+ * filter-sel example: ['name' = 'URL'], $source.man['age' > $source.graduate_age + 5 ]
+ *
+ * When <math-expr> in the selection, it mustn't contain the different <selection-head>, for example:
+ * $source.tags[1+2] valid
+ * $source.tags[$source.first] valid
+ * $source.tags[$target.first] invalid
+ * -- Such job is for validation, not for parser
+ *
+ *
+ * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean>
+ * <literal-string> ::= <any-string>
+ * <literal-number> ::= <integer> | <double>
+ * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
+ * <literal-boolean> ::= true | false
+ *
+ */
+
+ object Keyword {
+ def WhenKeywords: Parser[String] = """(?i)when""".r
+ def UnaryLogicalKeywords: Parser[String] = """(?i)not""".r
+ def BinaryLogicalKeywords: Parser[String] = """(?i)and|or""".r
+ def RangeKeywords: Parser[String] = """(?i)(not\s+)?(in|between)""".r
+ def DataSourceKeywords: Parser[String] = """(?i)\$(source|target)""".r
+ def Keywords: Parser[String] = WhenKeywords | UnaryLogicalKeywords | BinaryLogicalKeywords | RangeKeywords | DataSourceKeywords
+ }
+ import Keyword._
+
+ object Operator {
+ def NotLogicalOpr: Parser[String] = """(?i)not""".r | "!"
+ def AndLogicalOpr: Parser[String] = """(?i)and""".r | "&&"
+ def OrLogicalOpr: Parser[String] = """(?i)or""".r | "||"
+ def CompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r
+ def RangeOpr: Parser[String] = RangeKeywords
+
+ def UnaryMathOpr: Parser[String] = "+" | "-"
+ def BinaryMathOpr1: Parser[String] = "*" | "/" | "%"
+ def BinaryMathOpr2: Parser[String] = "+" | "-"
+
+ def FilterCompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r
+
+ def SqBracketPair: (Parser[String], Parser[String]) = ("[", "]")
+ def BracketPair: (Parser[String], Parser[String]) = ("(", ")")
+ def Dot: Parser[String] = "."
+ def AllSelection: Parser[String] = "*"
+ def SQuote: Parser[String] = "'"
+ def DQuote: Parser[String] = "\""
+ def Comma: Parser[String] = ","
+ }
+ import Operator._
+
+ object SomeString {
+ def AnyString: Parser[String] = """[^'\"{}\[\]()=<>.$@,;+\-*/\\]*""".r
+ def SimpleFieldString: Parser[String] = """\w+""".r
+ def FieldString: Parser[String] = """[\w\s]+""".r
+ def NameString: Parser[String] = """[a-zA-Z_]\w*""".r
+ }
+ import SomeString._
+
+ object SomeNumber {
+ def IntegerNumber: Parser[String] = """[+\-]?\d+""".r
+ def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r
+ def IndexNumber: Parser[String] = IntegerNumber
+ }
+ import SomeNumber._
+
+ // -- literal --
+ def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean
+ def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) }
+ def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) }
+ def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { LiteralTimeExpr(_) }
+ def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | """(?i)false""".r) ^^ { LiteralBooleanExpr(_) }
+
+ // -- selection --
+ // <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+
+ def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ^^ {
+ case head ~ selectors => SelectionExpr(head, selectors)
+ }
+ def selector: Parser[SelectExpr] = (fieldSelect | functionOperation | indexFieldRangeSelect | filterSelect)
+
+ def selectionHead: Parser[SelectionHead] = DataSourceKeywords ^^ { SelectionHead(_) }
+ // <field-sel> ::= "." <field-string>
+ def fieldSelect: Parser[IndexFieldRangeSelectExpr] = Dot ~> SimpleFieldString ^^ {
+ case field => IndexFieldRangeSelectExpr(FieldDesc(field) :: Nil)
+ }
+ // <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
+ def functionOperation: Parser[FunctionOperationExpr] = Dot ~ NameString ~ BracketPair._1 ~ repsep(argument, Comma) ~ BracketPair._2 ^^ {
+ case _ ~ func ~ _ ~ args ~ _ => FunctionOperationExpr(func, args)
+ }
+ def argument: Parser[MathExpr] = mathExpr
+ // <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]"
+ def indexFieldRangeSelect: Parser[IndexFieldRangeSelectExpr] = SqBracketPair._1 ~> rep1sep(indexFieldRange, Comma) <~ SqBracketPair._2 ^^ {
+ case ifrs => IndexFieldRangeSelectExpr(ifrs)
+ }
+ // <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*"
+ def indexFieldRange: Parser[FieldDescOnly] = indexField | BracketPair._1 ~ indexField ~ Comma ~ indexField ~ BracketPair._2 ^^ {
+ case _ ~ if1 ~ _ ~ if2 ~ _ => FieldRangeDesc(if1, if2)
+ }
+ // <index-field> ::= <index> | <field-quote> | <all-selection>
+ // *here it can parse <math-expr>, but for simple situation, not supported now*
+ def indexField: Parser[FieldDescOnly] = IndexNumber ^^ { IndexDesc(_) } | fieldQuote | AllSelection ^^ { AllFieldsDesc(_) }
+ // <field-quote> ::= ' <field-string> ' | " <field-string> "
+ def fieldQuote: Parser[FieldDesc] = (SQuote ~> FieldString <~ SQuote | DQuote ~> FieldString <~ DQuote) ^^ { FieldDesc(_) }
+ // <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
+ def filterSelect: Parser[FilterSelectExpr] = SqBracketPair._1 ~> fieldQuote ~ FilterCompareOpr ~ mathExpr <~ SqBracketPair._2 ^^ {
+ case field ~ compare ~ value => FilterSelectExpr(field, compare, value)
+ }
+
+ // -- math --
+ // <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
+ def mathFactor: Parser[MathExpr] = (literal | selection | BracketPair._1 ~> mathExpr <~ BracketPair._2) ^^ { MathFactorExpr(_) }
+ // <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
+ // <unary-opr> ::= "+" | "-"
+ def unaryMathExpr: Parser[MathExpr] = rep(UnaryMathOpr) ~ mathFactor ^^ {
+ case Nil ~ a => a
+ case list ~ a => UnaryMathExpr(list, a)
+ }
+ // <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
+ def binaryMathExpr1: Parser[MathExpr] = unaryMathExpr ~ rep(BinaryMathOpr1 ~ unaryMathExpr) ^^ {
+ case a ~ Nil => a
+ case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
+ }
+ def binaryMathExpr2: Parser[MathExpr] = binaryMathExpr1 ~ rep(BinaryMathOpr2 ~ binaryMathExpr1) ^^ {
+ case a ~ Nil => a
+ case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
+ }
+ def mathExpr: Parser[MathExpr] = binaryMathExpr2
+
+ // -- logical expression --
+ // <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
+ def rangeExpr: Parser[RangeDesc] = BracketPair._1 ~> repsep(mathExpr, Comma) <~ BracketPair._2 ^^ { RangeDesc(_) }
+ // <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>)
+ def logicalExpr: Parser[LogicalExpr] = mathExpr ~ CompareOpr ~ mathExpr ^^ {
+ case left ~ opr ~ right => LogicalCompareExpr(left, opr, right)
+ } | mathExpr ~ RangeOpr ~ rangeExpr ^^ {
+ case left ~ opr ~ range => LogicalRangeExpr(left, opr, range)
+ } | mathExpr ^^ { LogicalSimpleExpr(_) }
+
+ // -- logical statement --
+ def logicalFactor: Parser[LogicalExpr] = logicalExpr | BracketPair._1 ~> logicalStatement <~ BracketPair._2
+ def notLogicalStatement: Parser[LogicalExpr] = rep(NotLogicalOpr) ~ logicalFactor ^^ {
+ case Nil ~ a => a
+ case list ~ a => UnaryLogicalExpr(list, a)
+ }
+ def andLogicalStatement: Parser[LogicalExpr] = notLogicalStatement ~ rep(AndLogicalOpr ~ notLogicalStatement) ^^ {
+ case a ~ Nil => a
+ case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
+ }
+ def orLogicalStatement: Parser[LogicalExpr] = andLogicalStatement ~ rep(OrLogicalOpr ~ andLogicalStatement) ^^ {
+ case a ~ Nil => a
+ case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
+ }
+ // <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")"
+ def logicalStatement: Parser[LogicalExpr] = orLogicalStatement
+
+ // -- rule --
+ // <rule> ::= <logical-statement> [WHEN <logical-statement>]
+ def rule: Parser[StatementExpr] = logicalStatement ~ opt(WhenKeywords ~> logicalStatement) ^^ {
+ case ls ~ Some(ws) => WhenClauseStatementExpr(ls, ws)
+ case ls ~ _ => SimpleStatementExpr(ls)
+ }
+
+ // for complie only
+// case class NullStatementExpr(expression: String) extends StatementExpr {
+// def genValue(values: Map[String, Any]): Option[Any] = None
+// def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = Nil
+// }
+// def statementsExpr = mathExpr ^^ { NullStatementExpr(_) }
+
+
+//
+// // basic
+// val anyString: Parser[String] = """[^'{}\[\]()=<>.$@;+\-*/\\\"]*""".r
+// val variable: Parser[String] = """[a-zA-Z_]\w*""".r
+// val number: Parser[String] = """[+\-]?\d+""".r
+// val time: Parser[String] = """\d+(y|M|w|d|h|m|s|ms)""".r
+//
+// val numPosition: Parser[String] = """\d+""".r
+// val anyPosition: Parser[String] = "*"
+//
+// val filterOpr: Parser[String] = "=" | "!=" | ">" | "<" | ">=" | "<="
+//
+// val opr1: Parser[String] = "*" | "/" | "%"
+// val opr2: Parser[String] = "+" | "-"
+//
+// val assignOpr: Parser[String] = "="
+// val compareOpr: Parser[String] = "==" | "!=" | ">" | "<" | ">=" | "<="
+// val mappingOpr: Parser[String] = "==="
+//
+// val exprSep: Parser[String] = ";"
+//
+// // simple
+// def variableString: Parser[VariableExpr] = (("'" ~> anyString <~ "'") | ("\"" ~> anyString <~ "\"")) ^^ { VariableStringExpr(_) }
+// def constString: Parser[ConstExpr] = (("'" ~> anyString <~ "'") | ("\"" ~> anyString <~ "\"")) ^^ { ConstStringExpr(_) }
+// def constValue: Parser[ConstExpr] = time ^^ { ConstTimeExpr(_) } | number ^^ { ConstNumberExpr(_)} | constString
+// def variableValue: Parser[VariableExpr] = variable ^^ { VariableStringExpr(_) }
+// def quoteVariableValue: Parser[QuoteVariableExpr] = "${" ~> variable <~ "}" ^^ { QuoteVariableExpr(_) }
+// def position: Parser[SelectExpr] = anyPosition ^^ { AnyPositionExpr(_) } | """\d+""".r ^^ { NumPositionExpr(_) } | (("'" ~> anyString <~ "'") | ("\"" ~> anyString <~ "\"")) ^^ { StringPositionExpr(_) }
+// def argument: Parser[ConstExpr] = constValue
+// def annotationExpr: Parser[AnnotationExpr] = "@" ~> variable ^^ { AnnotationExpr(_) }
+//
+// // selector
+// def filterOpration: Parser[SelectExpr] = (variableString ~ filterOpr ~ constString) ^^ {
+// case v ~ opr ~ c => FilterOprExpr(opr, v, c)
+// }
+// def positionExpr: Parser[SelectExpr] = "[" ~> (filterOpration | position) <~ "]"
+// def functionExpr: Parser[SelectExpr] = "." ~ variable ~ "(" ~ repsep(argument, ",") ~ ")" ^^ {
+// case _ ~ v ~ _ ~ args ~ _ => FunctionExpr(v, args)
+// }
+// def selectorExpr: Parser[SelectExpr] = positionExpr | functionExpr
+//
+// // data
+// def selectorsExpr: Parser[DataExpr] = quoteVariableValue ~ rep(selectorExpr) ^^ {
+// case q ~ tails => SelectionExpr(q, tails)
+// }
+//
+// // calculation
+// def factor: Parser[ElementExpr] = (constValue | selectorsExpr | "(" ~> expr <~ ")") ^^ { FactorExpr(_) }
+// def term: Parser[ElementExpr] = factor ~ rep(opr1 ~ factor) ^^ {
+// case a ~ Nil => a
+// case a ~ list => CalculationExpr(a, list.map(c => (c._1, c._2)))
+// }
+// def expr: Parser[ElementExpr] = term ~ rep(opr2 ~ term) ^^ {
+// case a ~ Nil => a
+// case a ~ list => CalculationExpr(a, list.map(c => (c._1, c._2)))
+// }
+//
+// // statement
+// def assignExpr: Parser[StatementExpr] = variableValue ~ assignOpr ~ expr ^^ {
+// case v ~ opr ~ c => AssignExpr(opr, v, c)
+// }
+// def conditionExpr: Parser[StatementExpr] = rep(annotationExpr) ~ expr ~ compareOpr ~ expr ^^ {
+// case anos ~ le ~ opr ~ re => ConditionExpr(opr, le, re, anos)
+// }
+// def mappingExpr: Parser[StatementExpr] = rep(annotationExpr) ~ expr ~ mappingOpr ~ expr ^^ {
+// case anos ~ le ~ opr ~ re => MappingExpr(opr, le, re, anos)
+// }
+// def statementExpr: Parser[StatementExpr] = assignExpr | conditionExpr | mappingExpr
+//
+// // statements
+// def statementsExpr: Parser[StatementExpr] = repsep(statementExpr, exprSep) ^^ { StatementsExpr(_) }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
new file mode 100644
index 0000000..0d54707
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
@@ -0,0 +1,7 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+
+trait AnalyzableExpr extends Serializable {
+ def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = Nil
+ def getWhenClauseExpr(): Option[LogicalExpr] = None
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
new file mode 100644
index 0000000..e062376
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
@@ -0,0 +1,15 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait Cacheable extends DataSourceable {
+ protected def cacheUnit: Boolean = false
+ def cacheable(ds: String): Boolean = {
+ cacheUnit && !conflict() && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && contains(ds)))
+ }
+ protected def getCacheExprs(ds: String): Iterable[Cacheable]
+
+ protected def persistUnit: Boolean = false
+ def persistable(ds: String): Boolean = {
+ persistUnit && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && contains(ds)))
+ }
+ protected def getPersistExprs(ds: String): Iterable[Cacheable]
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
new file mode 100644
index 0000000..8018c19
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
@@ -0,0 +1,7 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait Calculatable extends Serializable {
+
+ def calculate(values: Map[String, Any]): Option[Any]
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
new file mode 100644
index 0000000..f18798a
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
@@ -0,0 +1,10 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait DataSourceable extends Serializable {
+ val dataSources: Set[String]
+ protected def conflict(): Boolean = dataSources.size > 1
+ def contains(ds: String): Boolean = dataSources.contains(ds)
+ def dataSourceOpt: Option[String] = {
+ if (dataSources.size == 1) Some(dataSources.head) else None
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
new file mode 100644
index 0000000..38758a2
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
@@ -0,0 +1,15 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait Describable extends Serializable {
+
+ val desc: String
+
+ protected def describe(v: Any): String = {
+ v match {
+ case s: Describable => s"${s.desc}"
+ case s: String => s"'${s}'"
+ case a => s"${a}"
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
new file mode 100644
index 0000000..d7810aa
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
@@ -0,0 +1,33 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait Expr extends Serializable with Describable with Cacheable with Calculatable {
+
+ protected val _defaultId: String = ExprIdCounter.emptyId
+
+ val _id = ExprIdCounter.genId(_defaultId)
+
+ protected def getSubCacheExprs(ds: String): Iterable[Expr] = Nil
+ final def getCacheExprs(ds: String): Iterable[Expr] = {
+ if (cacheable(ds)) getSubCacheExprs(ds).toList :+ this else getSubCacheExprs(ds)
+ }
+
+ protected def getSubFinalCacheExprs(ds: String): Iterable[Expr] = Nil
+ final def getFinalCacheExprs(ds: String): Iterable[Expr] = {
+ if (cacheable(ds)) Nil :+ this else getSubFinalCacheExprs(ds)
+ }
+
+ protected def getSubPersistExprs(ds: String): Iterable[Expr] = Nil
+ final def getPersistExprs(ds: String): Iterable[Expr] = {
+ if (persistable(ds)) getSubPersistExprs(ds).toList :+ this else getSubPersistExprs(ds)
+ }
+
+ final def calculate(values: Map[String, Any]): Option[Any] = {
+ values.get(_id) match {
+ case Some(v) => Some(v)
+ case _ => calculateOnly(values)
+ }
+ }
+ protected def calculateOnly(values: Map[String, Any]): Option[Any]
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
new file mode 100644
index 0000000..0bb5085
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
@@ -0,0 +1,22 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait ExprDescOnly extends Describable {
+
+}
+
+
+case class SelectionHead(expr: String) extends ExprDescOnly {
+ private val headRegex = """\$(\w+)""".r
+ val head: String = expr match {
+ case headRegex(v) => v.toLowerCase
+ case _ => expr
+ }
+ val desc: String = "$" + head
+}
+
+case class RangeDesc(elements: Iterable[MathExpr]) extends ExprDescOnly {
+ val desc: String = {
+ val rangeDesc = elements.map(_.desc).mkString(", ")
+ s"(${rangeDesc})"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
new file mode 100644
index 0000000..56e7daa
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
@@ -0,0 +1,42 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.{Set => MutableSet}
+
+object ExprIdCounter {
+
+ private val idCounter: AtomicLong = new AtomicLong(0L)
+
+ private val existIdSet: MutableSet[String] = MutableSet.empty[String]
+
+ private val invalidIdRegex = """^\d+$""".r
+
+ val emptyId: String = ""
+
+ def genId(defaultId: String): String = {
+ defaultId match {
+ case emptyId => increment.toString
+ case invalidIdRegex() => increment.toString
+// case defId if (exist(defId)) => s"${increment}#${defId}"
+ case defId if (exist(defId)) => s"${defId}"
+ case _ => {
+ insertUserId(defaultId)
+ defaultId
+ }
+ }
+ }
+
+ private def exist(id: String): Boolean = {
+ existIdSet.contains(id)
+ }
+
+ private def insertUserId(id: String): Unit = {
+ existIdSet += id
+ }
+
+ private def increment(): Long = {
+ idCounter.incrementAndGet()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
new file mode 100644
index 0000000..f13f15a
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
@@ -0,0 +1,40 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+import scala.util.{Success, Try}
+
+trait FieldDescOnly extends Describable with DataSourceable {
+
+}
+
+case class IndexDesc(expr: String) extends FieldDescOnly {
+ val index: Int = {
+ Try(expr.toInt) match {
+ case Success(v) => v
+ case _ => throw new Exception(s"${expr} is invalid index")
+ }
+ }
+ val desc: String = describe(index)
+ val dataSources: Set[String] = Set.empty[String]
+}
+
+case class FieldDesc(expr: String) extends FieldDescOnly {
+ val field: String = expr
+ val desc: String = describe(field)
+ val dataSources: Set[String] = Set.empty[String]
+}
+
+case class AllFieldsDesc(expr: String) extends FieldDescOnly {
+ val allFields: String = expr
+ val desc: String = allFields
+ val dataSources: Set[String] = Set.empty[String]
+}
+
+case class FieldRangeDesc(startField: FieldDescOnly, endField: FieldDescOnly) extends FieldDescOnly {
+ val desc: String = {
+ (startField, endField) match {
+ case (f1: IndexDesc, f2: IndexDesc) => s"(${f1.desc}, ${f2.desc})"
+ case _ => throw new Exception("invalid field range description")
+ }
+ }
+ val dataSources: Set[String] = Set.empty[String]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
new file mode 100644
index 0000000..020ddc2
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
@@ -0,0 +1,68 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+import scala.util.{Failure, Success, Try}
+
+trait LiteralExpr extends Expr {
+ val value: Option[Any]
+ def calculateOnly(values: Map[String, Any]): Option[Any] = value
+ val dataSources: Set[String] = Set.empty[String]
+}
+
+case class LiteralStringExpr(expr: String) extends LiteralExpr {
+ val value: Option[String] = Some(expr)
+ val desc: String = value.getOrElse("")
+}
+
+case class LiteralNumberExpr(expr: String) extends LiteralExpr {
+ val value: Option[Any] = {
+ if (expr.contains(".")) {
+ Try (expr.toDouble) match {
+ case Success(v) => Some(v)
+ case _ => throw new Exception(s"${expr} is invalid number")
+ }
+ } else {
+ Try (expr.toLong) match {
+ case Success(v) => Some(v)
+ case _ => throw new Exception(s"${expr} is invalid number")
+ }
+ }
+ }
+ val desc: String = value.getOrElse("").toString
+}
+
+case class LiteralTimeExpr(expr: String) extends LiteralExpr {
+ final val TimeRegex = """(\d+)(d|h|m|s|ms)""".r
+ val value: Option[Long] = {
+ Try {
+ expr match {
+ case TimeRegex(time, unit) => {
+ val t = time.toLong
+ unit match {
+ case "d" => t * 24 * 60 * 60 * 1000
+ case "h" => t * 60 * 60 * 1000
+ case "m" => t * 60 * 1000
+ case "s" => t * 1000
+ case "ms" => t
+ case _ => throw new Exception(s"${expr} is invalid time format")
+ }
+ }
+ case _ => throw new Exception(s"${expr} is invalid time format")
+ }
+ } match {
+ case Success(v) => Some(v)
+ case Failure(ex) => throw ex
+ }
+ }
+ val desc: String = expr
+}
+
+case class LiteralBooleanExpr(expr: String) extends LiteralExpr {
+ final val TrueRegex = """(?i)true""".r
+ final val FalseRegex = """(?i)false""".r
+ val value: Option[Boolean] = expr match {
+ case TrueRegex() => Some(true)
+ case FalseRegex() => Some(false)
+ case _ => throw new Exception(s"${expr} is invalid boolean")
+ }
+ val desc: String = value.getOrElse("").toString
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
new file mode 100644
index 0000000..611da38
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
@@ -0,0 +1,159 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+import org.apache.griffin.measure.batch.utils.CalculationUtil._
+
+trait LogicalExpr extends Expr with AnalyzableExpr {
+ override def cacheUnit: Boolean = true
+}
+
+case class LogicalSimpleExpr(expr: MathExpr) extends LogicalExpr {
+ def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values)
+ val desc: String = expr.desc
+ val dataSources: Set[String] = expr.dataSources
+ override def cacheUnit: Boolean = false
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = expr.getCacheExprs(ds)
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = expr.getFinalCacheExprs(ds)
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = expr.getPersistExprs(ds)
+}
+
+case class LogicalCompareExpr(left: MathExpr, compare: String, right: MathExpr) extends LogicalExpr {
+ private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, """!==?""".r, ">", ">=", "<", "<=")
+ def calculateOnly(values: Map[String, Any]): Option[Any] = {
+ val (lv, rv) = (left.calculate(values), right.calculate(values))
+ compare match {
+ case this.eqOpr() => lv === rv
+ case this.neqOpr() => lv =!= rv
+ case this.btOpr => lv > rv
+ case this.bteOpr => lv >= rv
+ case this.ltOpr => lv < rv
+ case this.lteOpr => lv <= rv
+ case _ => None
+ }
+ }
+ val desc: String = s"${left.desc} ${compare} ${right.desc}"
+ val dataSources: Set[String] = left.dataSources ++ right.dataSources
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+ left.getCacheExprs(ds) ++ right.getCacheExprs(ds)
+ }
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+ left.getFinalCacheExprs(ds) ++ right.getFinalCacheExprs(ds)
+ }
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+ left.getPersistExprs(ds) ++ right.getPersistExprs(ds)
+ }
+
+ override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = {
+ if (compare == "=" || compare == "==") {
+ (left.dataSourceOpt, right.dataSourceOpt) match {
+ case (Some(dsPair._1), Some(dsPair._2)) => (left, right) :: Nil
+ case (Some(dsPair._2), Some(dsPair._1)) => (right, left) :: Nil
+ case _ => Nil
+ }
+ } else Nil
+ }
+}
+
+case class LogicalRangeExpr(left: MathExpr, rangeOpr: String, range: RangeDesc) extends LogicalExpr {
+ private val (inOpr, ninOpr, btwnOpr, nbtwnOpr) = ("""(?i)in""".r, """(?i)not\s+in""".r, """(?i)between""".r, """(?i)not\s+between""".r)
+ def calculateOnly(values: Map[String, Any]): Option[Any] = {
+ val (lv, rvs) = (left.calculate(values), range.elements.map(_.calculate(values)))
+ rangeOpr match {
+ case this.inOpr() => lv in rvs
+ case this.ninOpr() => lv not_in rvs
+ case this.btwnOpr() => lv between rvs
+ case this.nbtwnOpr() => lv not_between rvs
+ case _ => None
+ }
+ }
+ val desc: String = s"${left.desc} ${rangeOpr} ${range.desc}"
+ val dataSources: Set[String] = left.dataSources ++ range.elements.flatMap(_.dataSources).toSet
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+ left.getCacheExprs(ds) ++ range.elements.flatMap(_.getCacheExprs(ds))
+ }
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+ left.getFinalCacheExprs(ds) ++ range.elements.flatMap(_.getFinalCacheExprs(ds))
+ }
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+ left.getPersistExprs(ds) ++ range.elements.flatMap(_.getPersistExprs(ds))
+ }
+}
+
+// -- logical statement --
+//case class LogicalFactorExpr(self: LogicalExpr) extends LogicalExpr {
+// def calculate(values: Map[String, Any]): Option[Any] = self.calculate(values)
+// val desc: String = self.desc
+//}
+
+case class UnaryLogicalExpr(oprList: Iterable[String], factor: LogicalExpr) extends LogicalExpr {
+ private val notOpr = """(?i)not|!""".r
+ def calculateOnly(values: Map[String, Any]): Option[Any] = {
+ val fv = factor.calculate(values)
+ oprList.foldRight(fv) { (opr, v) =>
+ opr match {
+ case this.notOpr() => !v
+ case _ => None
+ }
+ }
+ }
+ val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev}${ex}" }
+ val dataSources: Set[String] = factor.dataSources
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+ factor.getCacheExprs(ds)
+ }
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+ factor.getFinalCacheExprs(ds)
+ }
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+ factor.getPersistExprs(ds)
+ }
+
+ override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = {
+ val notOprList = oprList.filter { opr =>
+ opr match {
+ case this.notOpr() => true
+ case _ => false
+ }
+ }
+ if (notOprList.size % 2 == 0) factor.getGroupbyExprPairs(dsPair) else Nil
+ }
+}
+
+case class BinaryLogicalExpr(first: LogicalExpr, others: Iterable[(String, LogicalExpr)]) extends LogicalExpr {
+ private val (andOpr, orOpr) = ("""(?i)and|&&""".r, """(?i)or|\|\|""".r)
+ def calculateOnly(values: Map[String, Any]): Option[Any] = {
+ val fv = first.calculate(values)
+ others.foldLeft(fv) { (v, pair) =>
+ val (opr, next) = pair
+ val nv = next.calculate(values)
+ opr match {
+ case this.andOpr() => v && nv
+ case this.orOpr() => v || nv
+ case _ => None
+ }
+ }
+ }
+ val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" }
+ val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+ first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds))
+ }
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+ first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds))
+ }
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+ first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds))
+ }
+
+ override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = {
+ if (others.isEmpty) first.getGroupbyExprPairs(dsPair)
+ else {
+ val isAnd = others.exists(_._1 match {
+ case this.andOpr() => true
+ case _ => false
+ })
+ if (isAnd) {
+ first.getGroupbyExprPairs(dsPair) ++ others.flatMap(_._2.getGroupbyExprPairs(dsPair))
+ } else Nil
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
new file mode 100644
index 0000000..db09a0c
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
@@ -0,0 +1,79 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+import org.apache.griffin.measure.batch.utils.CalculationUtil._
+
+trait MathExpr extends Expr {
+
+}
+
+case class MathFactorExpr(self: Expr) extends MathExpr {
+ def calculateOnly(values: Map[String, Any]): Option[Any] = self.calculate(values)
+ val desc: String = self.desc
+ val dataSources: Set[String] = self.dataSources
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+ self.getCacheExprs(ds)
+ }
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+ self.getFinalCacheExprs(ds)
+ }
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+ self.getPersistExprs(ds)
+ }
+}
+
+case class UnaryMathExpr(oprList: Iterable[String], factor: Expr) extends MathExpr {
+ private val (posOpr, negOpr) = ("+", "-")
+ def calculateOnly(values: Map[String, Any]): Option[Any] = {
+ val fv = factor.calculate(values)
+ oprList.foldRight(fv) { (opr, v) =>
+ opr match {
+ case this.posOpr => v
+ case this.negOpr => -v
+ case _ => None
+ }
+ }
+ }
+ val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev}${ex}" }
+ val dataSources: Set[String] = factor.dataSources
+ override def cacheUnit: Boolean = true
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+ factor.getCacheExprs(ds)
+ }
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+ factor.getFinalCacheExprs(ds)
+ }
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+ factor.getPersistExprs(ds)
+ }
+}
+
+case class BinaryMathExpr(first: MathExpr, others: Iterable[(String, MathExpr)]) extends MathExpr {
+ private val (addOpr, subOpr, mulOpr, divOpr, modOpr) = ("+", "-", "*", "/", "%")
+ def calculateOnly(values: Map[String, Any]): Option[Any] = {
+ val fv = first.calculate(values)
+ others.foldLeft(fv) { (v, pair) =>
+ val (opr, next) = pair
+ val nv = next.calculate(values)
+ opr match {
+ case this.addOpr => v + nv
+ case this.subOpr => v - nv
+ case this.mulOpr => v * nv
+ case this.divOpr => v / nv
+ case this.modOpr => v % nv
+ case _ => None
+ }
+ }
+ }
+ val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" }
+ val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet
+ override def cacheUnit: Boolean = true
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+ first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds))
+ }
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+ first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds))
+ }
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+ first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
new file mode 100644
index 0000000..52ebe21
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
@@ -0,0 +1,53 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait SelectExpr extends Expr {
+ def calculateOnly(values: Map[String, Any]): Option[Any] = None
+}
+
+case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends SelectExpr {
+ val desc: String = s"[${fields.map(_.desc).mkString(",")}]"
+ val dataSources: Set[String] = Set.empty[String]
+}
+
+case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) extends SelectExpr {
+ val desc: String = s".${func}(${args.map(_.desc).mkString(",")})"
+ val dataSources: Set[String] = args.flatMap(_.dataSources).toSet
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getCacheExprs(ds))
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getFinalCacheExprs(ds))
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = args.flatMap(_.getPersistExprs(ds))
+}
+
+case class FilterSelectExpr(field: FieldDesc, compare: String, value: MathExpr) extends SelectExpr {
+ val desc: String = s"[${field.desc}${compare}${value.desc}]"
+ val dataSources: Set[String] = value.dataSources
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = value.getCacheExprs(ds)
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = value.getFinalCacheExprs(ds)
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = value.getPersistExprs(ds)
+}
+
+// -- selection --
+case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) extends Expr {
+ def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id)
+
+ val desc: String = {
+ val argsString = selectors.map(_.desc).mkString("")
+ s"${head.desc}${argsString}"
+ }
+ val dataSources: Set[String] = {
+ val selectorDataSources = selectors.flatMap(_.dataSources).toSet
+ selectorDataSources + head.head
+ }
+
+ override def cacheUnit: Boolean = true
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+ selectors.flatMap(_.getCacheExprs(ds))
+ }
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+ selectors.flatMap(_.getFinalCacheExprs(ds))
+ }
+
+ override def persistUnit: Boolean = true
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+ selectors.flatMap(_.getPersistExprs(ds))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
new file mode 100644
index 0000000..a872e18
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
@@ -0,0 +1,52 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+
+trait StatementExpr extends Expr with AnalyzableExpr {
+ def valid(values: Map[String, Any]): Boolean = true
+ override def cacheUnit: Boolean = true
+}
+
+case class SimpleStatementExpr(expr: LogicalExpr) extends StatementExpr {
+ def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values)
+ val desc: String = expr.desc
+ val dataSources: Set[String] = expr.dataSources
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+ expr.getCacheExprs(ds)
+ }
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+ expr.getFinalCacheExprs(ds)
+ }
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+ expr.getPersistExprs(ds)
+ }
+
+ override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = expr.getGroupbyExprPairs(dsPair)
+}
+
+case class WhenClauseStatementExpr(expr: LogicalExpr, whenExpr: LogicalExpr) extends StatementExpr {
+ def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values)
+ val desc: String = s"${expr.desc} when ${whenExpr.desc}"
+
+ override def valid(values: Map[String, Any]): Boolean = {
+ whenExpr.calculate(values) match {
+ case Some(r: Boolean) => r
+ case _ => false
+ }
+ }
+
+ val dataSources: Set[String] = expr.dataSources ++ whenExpr.dataSources
+ override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+ expr.getCacheExprs(ds) ++ whenExpr.getCacheExprs(ds)
+ }
+ override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+ expr.getFinalCacheExprs(ds) ++ whenExpr.getFinalCacheExprs(ds)
+ }
+ override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+ expr.getPersistExprs(ds) ++ whenExpr.getPersistExprs(ds)
+ }
+
+ override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = {
+ expr.getGroupbyExprPairs(dsPair) ++ whenExpr.getGroupbyExprPairs(dsPair)
+ }
+ override def getWhenClauseExpr(): Option[LogicalExpr] = Some(whenExpr)
+}
\ No newline at end of file