You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/06/02 09:31:54 UTC

[5/6] incubator-griffin git commit: griffin-measure package modification

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
deleted file mode 100644
index 28dcd22..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.griffin.measure.batch.persist
-
-import org.apache.griffin.measure.batch.log.Loggable
-import org.apache.griffin.measure.batch.result._
-import org.apache.spark.rdd.RDD
-
-import scala.util.Try
-
-
-trait Persist extends Loggable with Serializable {
-  val timeStamp: Long
-
-  val config: Map[String, Any]
-
-  def available(): Boolean
-
-  def start(msg: String): Unit
-  def finish(): Unit
-
-  def result(rt: Long, result: Result): Unit
-
-  def missRecords(records: RDD[String]): Unit
-  def matchRecords(records: RDD[String]): Unit
-
-  def log(rt: Long, msg: String): Unit
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
deleted file mode 100644
index 356934b..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.griffin.measure.batch.persist
-
-import org.apache.griffin.measure.batch.config.params.env._
-
-import scala.util.{Success, Try}
-
-
-case class PersistFactory(persistParams: Iterable[PersistParam], metricName: String) extends Serializable {
-
-  val HDFS_REGEX = """^(?i)hdfs$""".r
-  val HTTP_REGEX = """^(?i)http$""".r
-
-  def getPersists(timeStamp: Long): MultiPersists = {
-    MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param)))
-  }
-
-  // get the persists configured
-  private def getPersist(timeStamp: Long, persistParam: PersistParam): Option[Persist] = {
-    val persistConfig = persistParam.config
-    val persistTry = persistParam.persistType match {
-      case HDFS_REGEX() => Try(HdfsPersist(persistConfig, metricName, timeStamp))
-      case HTTP_REGEX() => Try(HttpPersist(persistConfig, metricName, timeStamp))
-      case _ => throw new Exception("not supported persist type")
-    }
-    persistTry match {
-      case Success(persist) if (persist.available) => Some(persist)
-      case _ => None
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
deleted file mode 100644
index 3ee7544..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.griffin.measure.batch.result
-
-// result for accuracy: miss count, total count
-case class AccuracyResult(miss: Long, total: Long) extends Result {
-
-  type T = AccuracyResult
-
-  def update(delta: T): T = {
-    AccuracyResult(delta.miss, total)
-  }
-
-  def eventual(): Boolean = {
-    this.miss <= 0
-  }
-
-  def differsFrom(other: T): Boolean = {
-    (this.miss != other.miss) || (this.total != other.total)
-  }
-
-  def getMiss = miss
-  def getTotal = total
-  def getMatch = total - miss
-
-  def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala
deleted file mode 100644
index 0b52bfb..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.griffin.measure.batch.result
-
-// result for profile: match count, total count
-case class ProfileResult(matchCount: Long, totalCount: Long) extends Result {
-
-  type T = ProfileResult
-
-  def update(delta: T): T = {
-    ProfileResult(matchCount + delta.matchCount, totalCount)
-  }
-
-  def eventual(): Boolean = {
-    this.matchCount >= totalCount
-  }
-
-  def differsFrom(other: T): Boolean = {
-    (this.matchCount != other.matchCount) || (this.totalCount != other.totalCount)
-  }
-
-  def getMiss = totalCount - matchCount
-  def getTotal = totalCount
-  def getMatch = matchCount
-
-  def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
deleted file mode 100644
index 8d529db..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.griffin.measure.batch.result
-
-
-trait Result extends Serializable {
-
-  type T <: Result
-
-  def update(delta: T): T
-
-  def eventual(): Boolean
-
-  def differsFrom(other: T): Boolean
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
deleted file mode 100644
index 62df447..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.griffin.measure.batch.result
-
-
-sealed trait ResultInfo {
-  type T
-  val key: String
-  val tp: String
-  def wrap(value: T) = (key -> value)
-}
-
-final case object TimeGroupInfo extends ResultInfo {
-  type T = Long
-  val key = "__time__"
-  val tp = "bigint"
-}
-
-final case object NextFireTimeInfo extends ResultInfo {
-  type T = Long
-  val key = "__next_fire_time__"
-  val tp = "bigint"
-}
-
-final case object MismatchInfo extends ResultInfo {
-  type T = String
-  val key = "__mismatch__"
-  val tp = "string"
-}
-
-final case object TargetInfo extends ResultInfo {
-  type T = Map[String, Any]
-  val key = "__target__"
-  val tp = "map"
-}
-
-final case object ErrorInfo extends ResultInfo {
-  type T = String
-  val key = "__error__"
-  val tp = "string"
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
deleted file mode 100644
index 9f2e29d..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
+++ /dev/null
@@ -1,297 +0,0 @@
-package org.apache.griffin.measure.batch.rule
-
-import scala.util.{Success, Try}
-
-
-object CalculationUtil {
-
-  implicit def option2CalculationValue(v: Option[_]): CalculationValue = CalculationValue(v)
-
-  // redefine the calculation method of operators in DSL
-  case class CalculationValue(value: Option[_]) extends Serializable {
-
-    def + (other: Option[_]): Option[_] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString)
-          case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte)
-          case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort)
-          case (Some(v1: Int), Some(v2)) => Some(v1 + v2.toString.toInt)
-          case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong)
-          case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat)
-          case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble)
-          case _ => value
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def - (other: Option[_]): Option[_] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte)
-          case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort)
-          case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt)
-          case (Some(v1: Long), Some(v2)) => Some(v1 - v2.toString.toLong)
-          case (Some(v1: Float), Some(v2)) => Some(v1 - v2.toString.toFloat)
-          case (Some(v1: Double), Some(v2)) => Some(v1 - v2.toString.toDouble)
-          case _ => value
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def * (other: Option[_]): Option[_] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2)
-          case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt)
-          case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte)
-          case (Some(v1: Short), Some(v2)) => Some(v1 * v2.toString.toShort)
-          case (Some(v1: Int), Some(v2)) => Some(v1 * v2.toString.toInt)
-          case (Some(v1: Long), Some(v2)) => Some(v1 * v2.toString.toLong)
-          case (Some(v1: Float), Some(v2)) => Some(v1 * v2.toString.toFloat)
-          case (Some(v1: Double), Some(v2)) => Some(v1 * v2.toString.toDouble)
-          case _ => value
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def / (other: Option[_]): Option[_] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte)
-          case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort)
-          case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt)
-          case (Some(v1: Long), Some(v2)) => Some(v1 / v2.toString.toLong)
-          case (Some(v1: Float), Some(v2)) => Some(v1 / v2.toString.toFloat)
-          case (Some(v1: Double), Some(v2)) => Some(v1 / v2.toString.toDouble)
-          case _ => value
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def % (other: Option[_]): Option[_] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte)
-          case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort)
-          case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt)
-          case (Some(v1: Long), Some(v2)) => Some(v1 % v2.toString.toLong)
-          case (Some(v1: Float), Some(v2)) => Some(v1 % v2.toString.toFloat)
-          case (Some(v1: Double), Some(v2)) => Some(v1 % v2.toString.toDouble)
-          case _ => value
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def unary_- (): Option[_] = {
-      value match {
-        case None => None
-        case Some(null) => None
-        case Some(v: String) => Some(v.reverse.toString)
-        case Some(v: Boolean) => Some(!v)
-        case Some(v: Byte) => Some(-v)
-        case Some(v: Short) => Some(-v)
-        case Some(v: Int) => Some(-v)
-        case Some(v: Long) => Some(-v)
-        case Some(v: Float) => Some(-v)
-        case Some(v: Double) => Some(-v)
-        case Some(v) => Some(v)
-        case _ => None
-      }
-    }
-
-
-    def === (other: Option[_]): Option[Boolean] = {
-      (value, other) match {
-        case (None, None) => Some(true)
-        case (Some(v1), Some(v2)) => Some(v1 == v2)
-        case _ => Some(false)
-      }
-    }
-
-    def =!= (other: Option[_]): Option[Boolean] = {
-      (value, other) match {
-        case (None, None) => Some(false)
-        case (Some(v1), Some(v2)) => Some(v1 != v2)
-        case _ => Some(true)
-      }
-    }
-
-    def > (other: Option[_]): Option[Boolean] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: String), Some(v2: String)) => Some(v1 > v2)
-          case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble)
-          case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble)
-          case (Some(v1: Int), Some(v2)) => Some(v1 > v2.toString.toDouble)
-          case (Some(v1: Long), Some(v2)) => Some(v1 > v2.toString.toDouble)
-          case (Some(v1: Float), Some(v2)) => Some(v1 > v2.toString.toDouble)
-          case (Some(v1: Double), Some(v2)) => Some(v1 > v2.toString.toDouble)
-          case _ => None
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def >= (other: Option[_]): Option[Boolean] = {
-      Try {
-        (value, other) match {
-          case (None, None) | (Some(null), Some(null)) => Some(true)
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2)
-          case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble)
-          case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble)
-          case (Some(v1: Int), Some(v2)) => Some(v1 >= v2.toString.toDouble)
-          case (Some(v1: Long), Some(v2)) => Some(v1 >= v2.toString.toDouble)
-          case (Some(v1: Float), Some(v2)) => Some(v1 >= v2.toString.toDouble)
-          case (Some(v1: Double), Some(v2)) => Some(v1 >= v2.toString.toDouble)
-          case _ => None
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def < (other: Option[_]): Option[Boolean] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: String), Some(v2: String)) => Some(v1 < v2)
-          case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble)
-          case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble)
-          case (Some(v1: Int), Some(v2)) => Some(v1 < v2.toString.toDouble)
-          case (Some(v1: Long), Some(v2)) => Some(v1 < v2.toString.toDouble)
-          case (Some(v1: Float), Some(v2)) => Some(v1 < v2.toString.toDouble)
-          case (Some(v1: Double), Some(v2)) => Some(v1 < v2.toString.toDouble)
-          case _ => None
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def <= (other: Option[_]): Option[Boolean] = {
-      Try {
-        (value, other) match {
-          case (None, None) | (Some(null), Some(null)) => Some(true)
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2)
-          case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble)
-          case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble)
-          case (Some(v1: Int), Some(v2)) => Some(v1 <= v2.toString.toDouble)
-          case (Some(v1: Long), Some(v2)) => Some(v1 <= v2.toString.toDouble)
-          case (Some(v1: Float), Some(v2)) => Some(v1 <= v2.toString.toDouble)
-          case (Some(v1: Double), Some(v2)) => Some(v1 <= v2.toString.toDouble)
-          case _ => None
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-
-    def in (other: Iterable[Option[_]]): Option[Boolean] = {
-      other.foldLeft(Some(false): Option[Boolean]) { (res, next) =>
-        optOr(res, ===(next))
-      }
-    }
-
-    def not_in (other: Iterable[Option[_]]): Option[Boolean] = {
-      other.foldLeft(Some(true): Option[Boolean]) { (res, next) =>
-        optAnd(res, =!=(next))
-      }
-    }
-
-    def between (other: Iterable[Option[_]]): Option[Boolean] = {
-      if (other.size < 2) None else {
-        val (begin, end) = (other.head, other.tail.head)
-        if (begin.isEmpty && end.isEmpty) Some(value.isEmpty)
-        else optAnd(>=(begin), <=(end))
-      }
-    }
-
-    def not_between (other: Iterable[Option[_]]): Option[Boolean] = {
-      if (other.size < 2) None else {
-        val (begin, end) = (other.head, other.tail.head)
-        if (begin.isEmpty && end.isEmpty) Some(value.nonEmpty)
-        else optOr(<(begin), >(end))
-      }
-    }
-
-    def unary_! (): Option[Boolean] = {
-      optNot(value)
-    }
-
-    def && (other: Option[_]): Option[Boolean] = {
-      optAnd(value, other)
-    }
-
-    def || (other: Option[_]): Option[Boolean] = {
-      optOr(value, other)
-    }
-
-
-    private def optNot(a: Option[_]): Option[Boolean] = {
-      a match {
-        case None => None
-        case Some(null) => None
-        case Some(v: Boolean) => Some(!v)
-        case _ => None
-      }
-    }
-    private def optAnd(a: Option[_], b: Option[_]): Option[Boolean] = {
-      (a, b) match {
-        case (None, _) | (_, None) => None
-        case (Some(null), _) | (_, Some(null)) => None
-        case (Some(false), _) | (_, Some(false)) => Some(false)
-        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2)
-        case _ => None
-      }
-    }
-    private def optOr(a: Option[_], b: Option[_]): Option[Boolean] = {
-      (a, b) match {
-        case (None, _) | (_, None) => None
-        case (Some(null), _) | (_, Some(null)) => None
-        case (Some(true), _) | (_, Some(true)) => Some(true)
-        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2)
-        case _ => None
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/ExprValueUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/ExprValueUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/ExprValueUtil.scala
deleted file mode 100644
index ca42c5f..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/ExprValueUtil.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-package org.apache.griffin.measure.batch.rule
-
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.spark.sql.Row
-
-import scala.util.{Success, Try}
-
-object ExprValueUtil {
-
-  // from origin data such as a Row of DataFrame, with existed expr value map, calculate related expression, get the expression value
-  // for now, one expr only get one value, not supporting one expr get multiple values
-  // params:
-  // - originData: the origin data such as a Row of DataFrame
-  // - expr: the expression to be calculated
-  // - existExprValueMap: existed expression value map, which might be used to get some existed expression value during calculation
-  // output: the calculated expression value
-  private def calcExprValue(originData: Option[Any], expr: Expr, existExprValueMap: Map[String, Any]): Option[Any] = {
-    Try {
-      expr match {
-        case selection: SelectionExpr => {
-          selection.selectors.foldLeft(originData) { (dt, selector) =>
-            calcExprValue(dt, selector, existExprValueMap)
-          }
-        }
-        case selector: IndexFieldRangeSelectExpr => {
-          originData match {
-            case Some(row: Row) => {
-              if (selector.fields.size == 1) {
-                selector.fields.head match {
-                  case i: IndexDesc => Some(row.getAs[Any](i.index))
-                  case f: FieldDesc => Some(row.getAs[Any](f.field))
-                  case _ => None
-                }
-              } else None
-            }
-            case _ => None
-          }
-        }
-        case _ => expr.calculate(existExprValueMap)
-      }
-    } match {
-      case Success(v) => v
-      case _ => None
-    }
-  }
-
-  // try to calculate expr from data and initExprValueMap, generate a new expression value map
-  // depends on origin data and existed expr value map
-  def genExprValueMap(data: Option[Any], expr: Expr, initExprValueMap: Map[String, Any]): Map[String, Any] = {
-    val valueOpt = calcExprValue(data, expr, initExprValueMap)
-    if (valueOpt.nonEmpty) {
-      initExprValueMap + (expr._id -> valueOpt.get)
-    } else initExprValueMap
-  }
-
-  // try to calculate some exprs from data and initExprValueMap, generate a new expression value map
-  // depends on origin data and existed expr value map
-  def genExprValueMap(data: Option[Any], exprs: Iterable[Expr], initExprValueMap: Map[String, Any]): Map[String, Any] = {
-    exprs.foldLeft(initExprValueMap) { (evMap, expr) =>
-      ExprValueUtil.genExprValueMap(None, expr, evMap)
-    }
-  }
-
-  // with exprValueMap, calculate expressions, update the expression value map
-  // only depends on existed expr value map, only calculation, not need origin data
-  def updateExprValueMap(exprs: Iterable[Expr], exprValueMap: Map[String, Any]): Map[String, Any] = {
-    exprs.foldLeft(Map[String, Any]()) { (evMap, expr) =>
-      val valueOpt = expr.calculate(exprValueMap)
-      if (valueOpt.nonEmpty) {
-        evMap + (expr._id -> valueOpt.get)
-      } else evMap
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
deleted file mode 100644
index d129ad7..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.griffin.measure.batch.rule
-
-import org.apache.griffin.measure.batch.rule.expr._
-
-case class RuleAnalyzer(rule: StatementExpr) extends Serializable {
-
-  val constData = ""
-  private val SourceData = "source"
-  private val TargetData = "target"
-
-  val constCacheExprs: Iterable[Expr] = rule.getCacheExprs(constData)
-  private val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData)
-  private val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData)
-
-  private val sourcePersistExprs: Iterable[Expr] = rule.getPersistExprs(SourceData)
-  private val targetPersistExprs: Iterable[Expr] = rule.getPersistExprs(TargetData)
-
-  val constFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(constData).toSet
-  private val sourceFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(SourceData).toSet ++ sourcePersistExprs.toSet
-  private val targetFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(TargetData).toSet ++ targetPersistExprs.toSet
-
-  private val groupbyExprPairs: Seq[(Expr, Expr)] = rule.getGroupbyExprPairs((SourceData, TargetData))
-  private val sourceGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._1)
-  private val targetGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._2)
-
-  private val whenClauseExprOpt: Option[LogicalExpr] = rule.getWhenClauseExpr
-
-  val sourceRuleExprs: RuleExprs = RuleExprs(sourceGroupbyExprs, sourceCacheExprs,
-    sourceFinalCacheExprs, sourcePersistExprs, whenClauseExprOpt)
-  val targetRuleExprs: RuleExprs = RuleExprs(targetGroupbyExprs, targetCacheExprs,
-    targetFinalCacheExprs, targetPersistExprs, whenClauseExprOpt)
-
-}
-
-
-// for a single data source
-// groupbyExprs: in accuracy case, these exprs could be groupby exprs
-//                  Data keys for accuracy case, generated by the equal statements, to improve the calculation efficiency
-// cacheExprs: the exprs value could be caculated independently, and cached for later use
-//                  Cached for the finalCacheExprs calculation, it has some redundant values, saving it wastes a lot
-// finalCacheExprs: the root of cachedExprs, cached for later use, plus with persistExprs
-//                  Cached for the calculation usage, and can be saved for the re-calculation in streaming mode
-// persistExprs: the expr values should be persisted, only the direct selection exprs are persistable
-//                  Persisted for record usage, to record the missing data, need be readable as raw data
-// whenClauseExprOpt: when clause of rule, to determine if the row of data source is filtered
-//                  Can be prep-calculated to filter some data in data connector
-case class RuleExprs(groupbyExprs: Seq[Expr],
-                     cacheExprs: Iterable[Expr],
-                     finalCacheExprs: Iterable[Expr],
-                     persistExprs: Iterable[Expr],
-                     whenClauseExprOpt: Option[LogicalExpr]
-                    ) {
-  // for example: for a rule "$source.name = $target.name AND $source.age < $target.age + (3 * 4)"
-  // in this rule, for the target data source, the targetRuleExprs looks like below
-  // groupbyExprs: $target.name
-  // cacheExprs: $target.name, $target.age, $target.age + (3 * 4)
-  // finalCacheExprs: $target.name, $target.age + (3 * 4), $target.age
-  // persistExprs: $target.name, $target.age
-  // whenClauseExprOpt: None
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
deleted file mode 100644
index cc3e8b3..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.griffin.measure.batch.rule
-
-import org.apache.griffin.measure.batch.config.params.user._
-
-import scala.util.Failure
-//import org.apache.griffin.measure.batch.rule.expr_old._
-import org.apache.griffin.measure.batch.rule.expr._
-
-import scala.util.{Success, Try}
-
-
-case class RuleFactory(evaluateRuleParam: EvaluateRuleParam) {
-
-  val ruleParser: RuleParser = RuleParser()
-
-  def generateRule(): StatementExpr = {
-    val rules = evaluateRuleParam.rules
-    val statement = parseExpr(rules) match {
-      case Success(se) => se
-      case Failure(ex) => throw ex
-    }
-    statement
-  }
-
-  private def parseExpr(rules: String): Try[StatementExpr] = {
-    Try {
-      val result = ruleParser.parseAll(ruleParser.rule, rules)
-      if (result.successful) result.get
-      else throw new Exception("parse rule error!")
-//      throw new Exception("parse rule error!")
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
deleted file mode 100644
index 49094e8..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-package org.apache.griffin.measure.batch.rule
-
-import org.apache.griffin.measure.batch.rule.expr._
-
-import scala.util.parsing.combinator._
-
-case class RuleParser() extends JavaTokenParsers with Serializable {
-
-  /**
-    * BNF representation for grammar as below:
-    *
-    * <rule> ::= <logical-statement> [WHEN <logical-statement>]
-    * rule: mapping-rule [WHEN when-rule]
-    * - mapping-rule: the first level opr should better not be OR | NOT, otherwise it can't automatically find the groupby column
-    * - when-rule: only contain the general info of data source, not the special info of each data row
-    *
-    * <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")"
-    * logical-statement: return boolean value
-    * logical-operator: "AND" | "&&", "OR" | "||", "NOT" | "!"
-    *
-    * <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>)
-    * logical-expression example: $source.id = $target.id, $source.page_id IN ('3214', '4312', '60821')
-    *
-    * <compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
-    * <range-opr> ::= ["NOT"] "IN" | "BETWEEN"
-    * <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
-    * range-expr example: ('3214', '4312', '60821'), (10, 15), ()
-    *
-    * <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
-    * math-expr example: $source.price * $target.count, "hello" + " " + "world" + 123
-    *
-    * <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
-    * <unary-opr> ::= "+" | "-"
-    *
-    * <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
-    *
-    * <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+
-    * selection example: $source.price, $source.json(), $source['state'], $source.numList[3], $target.json().mails['org' = 'apache'].names[*]
-    *
-    * <selection-head> ::= $source | $target
-    *
-    * <field-sel> ::= "." <field-string>
-    *
-    * <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
-    * <function-name> ::= <name-string>
-    * <arg> ::= <math-expr>
-    *
-    * <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]"
-    * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*"
-    * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * means all items, 'age' means item 'age'
-    * <index-field> ::= <index> | <field-quote> | <all-selection>
-    * index: 0 ~ n means position from start, -1 ~ -n means position from end
-    * <field-quote> ::= ' <field-string> ' | " <field-string> "
-    *
-    * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
-    * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
-    * filter-sel example: ['name' = 'URL'], $source.man['age' > $source.graduate_age + 5 ]
-    *
-    * When <math-expr> in the selection, it mustn't contain the different <selection-head>, for example:
-    * $source.tags[1+2]             valid
-    * $source.tags[$source.first]   valid
-    * $source.tags[$target.first]   invalid
-    * -- Such job is for validation, not for parser
-    *
-    *
-    * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean>
-    * <literal-string> ::= <any-string>
-    * <literal-number> ::= <integer> | <double>
-    * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
-    * <literal-boolean> ::= true | false
-    *
-    */
-
-  object Keyword {
-    def WhenKeywords: Parser[String] = """(?i)when""".r
-    def UnaryLogicalKeywords: Parser[String] = """(?i)not""".r
-    def BinaryLogicalKeywords: Parser[String] = """(?i)and|or""".r
-    def RangeKeywords: Parser[String] = """(?i)(not\s+)?(in|between)""".r
-    def DataSourceKeywords: Parser[String] = """(?i)\$(source|target)""".r
-    def Keywords: Parser[String] = WhenKeywords | UnaryLogicalKeywords | BinaryLogicalKeywords | RangeKeywords | DataSourceKeywords
-  }
-  import Keyword._
-
-  object Operator {
-    def NotLogicalOpr: Parser[String] = """(?i)not""".r | "!"
-    def AndLogicalOpr: Parser[String] = """(?i)and""".r | "&&"
-    def OrLogicalOpr: Parser[String] = """(?i)or""".r | "||"
-    def CompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r
-    def RangeOpr: Parser[String] = RangeKeywords
-
-    def UnaryMathOpr: Parser[String] = "+" | "-"
-    def BinaryMathOpr1: Parser[String] = "*" | "/" | "%"
-    def BinaryMathOpr2: Parser[String] = "+" | "-"
-
-    def FilterCompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r
-
-    def SqBracketPair: (Parser[String], Parser[String]) = ("[", "]")
-    def BracketPair: (Parser[String], Parser[String]) = ("(", ")")
-    def Dot: Parser[String] = "."
-    def AllSelection: Parser[String] = "*"
-    def SQuote: Parser[String] = "'"
-    def DQuote: Parser[String] = "\""
-    def Comma: Parser[String] = ","
-  }
-  import Operator._
-
-  object SomeString {
-    def AnyString: Parser[String] = """[^'\"{}\[\]()=<>.$@,;+\-*/\\]*""".r
-    def SimpleFieldString: Parser[String] = """\w+""".r
-    def FieldString: Parser[String] = """[\w\s]+""".r
-    def NameString: Parser[String] = """[a-zA-Z_]\w*""".r
-  }
-  import SomeString._
-
-  object SomeNumber {
-    def IntegerNumber: Parser[String] = """[+\-]?\d+""".r
-    def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r
-    def IndexNumber: Parser[String] = IntegerNumber
-  }
-  import SomeNumber._
-
-  // -- literal --
-  def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean | literialNull | literialNone
-  def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) }
-  def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) }
-  def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { LiteralTimeExpr(_) }
-  def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | """(?i)false""".r) ^^ { LiteralBooleanExpr(_) }
-  def literialNull: Parser[LiteralNullExpr] = ("""(?i)null""".r | """(?i)undefined""".r) ^^ { LiteralNullExpr(_) }
-  def literialNone: Parser[LiteralNoneExpr] = """(?i)none""".r ^^ { LiteralNoneExpr(_) }
-
-  // -- selection --
-  // <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+
-  def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ^^ {
-    case head ~ selectors => SelectionExpr(head, selectors)
-  }
-  def selector: Parser[SelectExpr] = (functionOperation | fieldSelect | indexFieldRangeSelect | filterSelect)
-
-  def selectionHead: Parser[SelectionHead] = DataSourceKeywords ^^ { SelectionHead(_) }
-  // <field-sel> ::= "." <field-string>
-  def fieldSelect: Parser[IndexFieldRangeSelectExpr] = Dot ~> SimpleFieldString ^^ {
-    case field => IndexFieldRangeSelectExpr(FieldDesc(field) :: Nil)
-  }
-  // <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
-  def functionOperation: Parser[FunctionOperationExpr] = Dot ~ NameString ~ BracketPair._1 ~ repsep(argument, Comma) ~ BracketPair._2 ^^ {
-    case _ ~ func ~ _ ~ args ~ _ => FunctionOperationExpr(func, args)
-  }
-  def argument: Parser[MathExpr] = mathExpr
-  // <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]"
-  def indexFieldRangeSelect: Parser[IndexFieldRangeSelectExpr] = SqBracketPair._1 ~> rep1sep(indexFieldRange, Comma) <~ SqBracketPair._2 ^^ {
-    case ifrs => IndexFieldRangeSelectExpr(ifrs)
-  }
-  // <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*"
-  def indexFieldRange: Parser[FieldDescOnly] = indexField | BracketPair._1 ~ indexField ~ Comma ~ indexField ~ BracketPair._2 ^^ {
-    case _ ~ if1 ~ _ ~ if2 ~ _ => FieldRangeDesc(if1, if2)
-  }
-  // <index-field> ::= <index> | <field-quote> | <all-selection>
-  // *here it can parse <math-expr>, but for simple situation, not supported now*
-  def indexField: Parser[FieldDescOnly] = IndexNumber ^^ { IndexDesc(_) } | fieldQuote | AllSelection ^^ { AllFieldsDesc(_) }
-  // <field-quote> ::= ' <field-string> ' | " <field-string> "
-  def fieldQuote: Parser[FieldDesc] = (SQuote ~> FieldString <~ SQuote | DQuote ~> FieldString <~ DQuote) ^^ { FieldDesc(_) }
-  // <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
-  def filterSelect: Parser[FilterSelectExpr] = SqBracketPair._1 ~> fieldQuote ~ FilterCompareOpr ~ mathExpr <~ SqBracketPair._2 ^^ {
-    case field ~ compare ~ value => FilterSelectExpr(field, compare, value)
-  }
-
-  // -- math --
-  // <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
-  def mathFactor: Parser[MathExpr] = (literal | selection | BracketPair._1 ~> mathExpr <~ BracketPair._2) ^^ { MathFactorExpr(_) }
-  // <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
-  // <unary-opr> ::= "+" | "-"
-  def unaryMathExpr: Parser[MathExpr] = rep(UnaryMathOpr) ~ mathFactor ^^ {
-    case Nil ~ a => a
-    case list ~ a => UnaryMathExpr(list, a)
-  }
-  // <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
-  def binaryMathExpr1: Parser[MathExpr] = unaryMathExpr ~ rep(BinaryMathOpr1 ~ unaryMathExpr) ^^ {
-    case a ~ Nil => a
-    case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
-  }
-  def binaryMathExpr2: Parser[MathExpr] = binaryMathExpr1 ~ rep(BinaryMathOpr2 ~ binaryMathExpr1) ^^ {
-    case a ~ Nil => a
-    case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
-  }
-  def mathExpr: Parser[MathExpr] = binaryMathExpr2
-
-  // -- logical expression --
-  // <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
-  def rangeExpr: Parser[RangeDesc] = BracketPair._1 ~> repsep(mathExpr, Comma) <~ BracketPair._2 ^^ { RangeDesc(_) }
-  // <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>)
-  def logicalExpr: Parser[LogicalExpr] = mathExpr ~ CompareOpr ~ mathExpr ^^ {
-    case left ~ opr ~ right => LogicalCompareExpr(left, opr, right)
-  } | mathExpr ~ RangeOpr ~ rangeExpr ^^ {
-    case left ~ opr ~ range => LogicalRangeExpr(left, opr, range)
-  } | mathExpr ^^ { LogicalSimpleExpr(_) }
-
-  // -- logical statement --
-  def logicalFactor: Parser[LogicalExpr] = logicalExpr | BracketPair._1 ~> logicalStatement <~ BracketPair._2
-  def notLogicalStatement: Parser[LogicalExpr] = rep(NotLogicalOpr) ~ logicalFactor ^^ {
-    case Nil ~ a => a
-    case list ~ a => UnaryLogicalExpr(list, a)
-  }
-  def andLogicalStatement: Parser[LogicalExpr] = notLogicalStatement ~ rep(AndLogicalOpr ~ notLogicalStatement) ^^ {
-    case a ~ Nil => a
-    case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
-  }
-  def orLogicalStatement: Parser[LogicalExpr] = andLogicalStatement ~ rep(OrLogicalOpr ~ andLogicalStatement) ^^ {
-    case a ~ Nil => a
-    case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
-  }
-  // <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")"
-  def logicalStatement: Parser[LogicalExpr] = orLogicalStatement
-
-  // -- rule --
-  // <rule> ::= <logical-statement> [WHEN <logical-statement>]
-  def rule: Parser[StatementExpr] = logicalStatement ~ opt(WhenKeywords ~> logicalStatement) ^^ {
-    case ls ~ Some(ws) => WhenClauseStatementExpr(ls, ws)
-    case ls ~ _ => SimpleStatementExpr(ls)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
deleted file mode 100644
index 82c494c..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-
-trait AnalyzableExpr extends Serializable {
-  def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = Nil
-  def getWhenClauseExpr(): Option[LogicalExpr] = None
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
deleted file mode 100644
index e062376..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-trait Cacheable extends DataSourceable {
-  protected def cacheUnit: Boolean = false
-  def cacheable(ds: String): Boolean = {
-    cacheUnit && !conflict() && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && contains(ds)))
-  }
-  protected def getCacheExprs(ds: String): Iterable[Cacheable]
-
-  protected def persistUnit: Boolean = false
-  def persistable(ds: String): Boolean = {
-    persistUnit && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && contains(ds)))
-  }
-  protected def getPersistExprs(ds: String): Iterable[Cacheable]
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
deleted file mode 100644
index 8018c19..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-trait Calculatable extends Serializable {
-
-  def calculate(values: Map[String, Any]): Option[Any]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
deleted file mode 100644
index f18798a..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-trait DataSourceable extends Serializable {
-  val dataSources: Set[String]
-  protected def conflict(): Boolean = dataSources.size > 1
-  def contains(ds: String): Boolean = dataSources.contains(ds)
-  def dataSourceOpt: Option[String] = {
-    if (dataSources.size == 1) Some(dataSources.head) else None
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
deleted file mode 100644
index 38758a2..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-trait Describable extends Serializable {
-
-  val desc: String
-
-  protected def describe(v: Any): String = {
-    v match {
-      case s: Describable => s"${s.desc}"
-      case s: String => s"'${s}'"
-      case a => s"${a}"
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
deleted file mode 100644
index d7810aa..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-trait Expr extends Serializable with Describable with Cacheable with Calculatable {
-
-  protected val _defaultId: String = ExprIdCounter.emptyId
-
-  val _id = ExprIdCounter.genId(_defaultId)
-
-  protected def getSubCacheExprs(ds: String): Iterable[Expr] = Nil
-  final def getCacheExprs(ds: String): Iterable[Expr] = {
-    if (cacheable(ds)) getSubCacheExprs(ds).toList :+ this else getSubCacheExprs(ds)
-  }
-
-  protected def getSubFinalCacheExprs(ds: String): Iterable[Expr] = Nil
-  final def getFinalCacheExprs(ds: String): Iterable[Expr] = {
-    if (cacheable(ds)) Nil :+ this else getSubFinalCacheExprs(ds)
-  }
-
-  protected def getSubPersistExprs(ds: String): Iterable[Expr] = Nil
-  final def getPersistExprs(ds: String): Iterable[Expr] = {
-    if (persistable(ds)) getSubPersistExprs(ds).toList :+ this else getSubPersistExprs(ds)
-  }
-
-  final def calculate(values: Map[String, Any]): Option[Any] = {
-    values.get(_id) match {
-      case Some(v) => Some(v)
-      case _ => calculateOnly(values)
-    }
-  }
-  protected def calculateOnly(values: Map[String, Any]): Option[Any]
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
deleted file mode 100644
index 0bb5085..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-trait ExprDescOnly extends Describable {
-
-}
-
-
-case class SelectionHead(expr: String) extends ExprDescOnly {
-  private val headRegex = """\$(\w+)""".r
-  val head: String = expr match {
-    case headRegex(v) => v.toLowerCase
-    case _ => expr
-  }
-  val desc: String = "$" + head
-}
-
-case class RangeDesc(elements: Iterable[MathExpr]) extends ExprDescOnly {
-  val desc: String = {
-    val rangeDesc = elements.map(_.desc).mkString(", ")
-    s"(${rangeDesc})"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
deleted file mode 100644
index 56e7daa..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-import java.util.concurrent.atomic.AtomicLong
-
-import scala.collection.mutable.{Set => MutableSet}
-
-object ExprIdCounter {
-
-  private val idCounter: AtomicLong = new AtomicLong(0L)
-
-  private val existIdSet: MutableSet[String] = MutableSet.empty[String]
-
-  private val invalidIdRegex = """^\d+$""".r
-
-  val emptyId: String = ""
-
-  def genId(defaultId: String): String = {
-    defaultId match {
-      case emptyId => increment.toString
-      case invalidIdRegex() => increment.toString
-//      case defId if (exist(defId)) => s"${increment}#${defId}"
-      case defId if (exist(defId)) => s"${defId}"
-      case _ => {
-        insertUserId(defaultId)
-        defaultId
-      }
-    }
-  }
-
-  private def exist(id: String): Boolean = {
-    existIdSet.contains(id)
-  }
-
-  private def insertUserId(id: String): Unit = {
-    existIdSet += id
-  }
-
-  private def increment(): Long = {
-    idCounter.incrementAndGet()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
deleted file mode 100644
index f13f15a..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-import scala.util.{Success, Try}
-
-trait FieldDescOnly extends Describable with DataSourceable {
-
-}
-
-case class IndexDesc(expr: String) extends FieldDescOnly {
-  val index: Int = {
-    Try(expr.toInt) match {
-      case Success(v) => v
-      case _ => throw new Exception(s"${expr} is invalid index")
-    }
-  }
-  val desc: String = describe(index)
-  val dataSources: Set[String] = Set.empty[String]
-}
-
-case class FieldDesc(expr: String) extends FieldDescOnly {
-  val field: String = expr
-  val desc: String = describe(field)
-  val dataSources: Set[String] = Set.empty[String]
-}
-
-case class AllFieldsDesc(expr: String) extends FieldDescOnly {
-  val allFields: String = expr
-  val desc: String = allFields
-  val dataSources: Set[String] = Set.empty[String]
-}
-
-case class FieldRangeDesc(startField: FieldDescOnly, endField: FieldDescOnly) extends FieldDescOnly {
-  val desc: String = {
-    (startField, endField) match {
-      case (f1: IndexDesc, f2: IndexDesc) => s"(${f1.desc}, ${f2.desc})"
-      case _ => throw new Exception("invalid field range description")
-    }
-  }
-  val dataSources: Set[String] = Set.empty[String]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
deleted file mode 100644
index 8bf0dd6..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-import scala.util.{Failure, Success, Try}
-
-trait LiteralExpr extends Expr {
-  val value: Option[Any]
-  def calculateOnly(values: Map[String, Any]): Option[Any] = value
-  val dataSources: Set[String] = Set.empty[String]
-}
-
-case class LiteralStringExpr(expr: String) extends LiteralExpr {
-  val value: Option[String] = Some(expr)
-  val desc: String = s"'${value.getOrElse("")}'"
-}
-
-case class LiteralNumberExpr(expr: String) extends LiteralExpr {
-  val value: Option[Any] = {
-    if (expr.contains(".")) {
-      Try (expr.toDouble) match {
-        case Success(v) => Some(v)
-        case _ => throw new Exception(s"${expr} is invalid number")
-      }
-    } else {
-      Try (expr.toLong) match {
-        case Success(v) => Some(v)
-        case _ => throw new Exception(s"${expr} is invalid number")
-      }
-    }
-  }
-  val desc: String = value.getOrElse("").toString
-}
-
-case class LiteralTimeExpr(expr: String) extends LiteralExpr {
-  final val TimeRegex = """(\d+)(d|h|m|s|ms)""".r
-  val value: Option[Long] = {
-    Try {
-      expr match {
-        case TimeRegex(time, unit) => {
-          val t = time.toLong
-          unit match {
-            case "d" => t * 24 * 60 * 60 * 1000
-            case "h" => t * 60 * 60 * 1000
-            case "m" => t * 60 * 1000
-            case "s" => t * 1000
-            case "ms" => t
-            case _ => throw new Exception(s"${expr} is invalid time format")
-          }
-        }
-        case _ => throw new Exception(s"${expr} is invalid time format")
-      }
-    } match {
-      case Success(v) => Some(v)
-      case Failure(ex) => throw ex
-    }
-  }
-  val desc: String = expr
-}
-
-case class LiteralBooleanExpr(expr: String) extends LiteralExpr {
-  final val TrueRegex = """(?i)true""".r
-  final val FalseRegex = """(?i)false""".r
-  val value: Option[Boolean] = expr match {
-    case TrueRegex() => Some(true)
-    case FalseRegex() => Some(false)
-    case _ => throw new Exception(s"${expr} is invalid boolean")
-  }
-  val desc: String = value.getOrElse("").toString
-}
-
-case class LiteralNullExpr(expr: String) extends LiteralExpr {
-  val value: Option[Any] = Some(null)
-  val desc: String = "null"
-}
-
-case class LiteralNoneExpr(expr: String) extends LiteralExpr {
-  val value: Option[Any] = None
-  val desc: String = "none"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
deleted file mode 100644
index c67f8ef..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
+++ /dev/null
@@ -1,159 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-import org.apache.griffin.measure.batch.rule.CalculationUtil._
-
-trait LogicalExpr extends Expr with AnalyzableExpr {
-  override def cacheUnit: Boolean = true
-}
-
-case class LogicalSimpleExpr(expr: MathExpr) extends LogicalExpr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values)
-  val desc: String = expr.desc
-  val dataSources: Set[String] = expr.dataSources
-  override def cacheUnit: Boolean = false
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = expr.getCacheExprs(ds)
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = expr.getFinalCacheExprs(ds)
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = expr.getPersistExprs(ds)
-}
-
-case class LogicalCompareExpr(left: MathExpr, compare: String, right: MathExpr) extends LogicalExpr {
-  private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, """!==?""".r, ">", ">=", "<", "<=")
-  def calculateOnly(values: Map[String, Any]): Option[Any] = {
-    val (lv, rv) = (left.calculate(values), right.calculate(values))
-    compare match {
-      case this.eqOpr() => lv === rv
-      case this.neqOpr() => lv =!= rv
-      case this.btOpr => lv > rv
-      case this.bteOpr => lv >= rv
-      case this.ltOpr => lv < rv
-      case this.lteOpr => lv <= rv
-      case _ => None
-    }
-  }
-  val desc: String = s"${left.desc} ${compare} ${right.desc}"
-  val dataSources: Set[String] = left.dataSources ++ right.dataSources
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    left.getCacheExprs(ds) ++ right.getCacheExprs(ds)
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    left.getFinalCacheExprs(ds) ++ right.getFinalCacheExprs(ds)
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    left.getPersistExprs(ds) ++ right.getPersistExprs(ds)
-  }
-
-  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = {
-    if (compare == "=" || compare == "==") {
-      (left.dataSourceOpt, right.dataSourceOpt) match {
-        case (Some(dsPair._1), Some(dsPair._2)) => (left, right) :: Nil
-        case (Some(dsPair._2), Some(dsPair._1)) => (right, left) :: Nil
-        case _ => Nil
-      }
-    } else Nil
-  }
-}
-
-case class LogicalRangeExpr(left: MathExpr, rangeOpr: String, range: RangeDesc) extends LogicalExpr {
-  private val (inOpr, ninOpr, btwnOpr, nbtwnOpr) = ("""(?i)in""".r, """(?i)not\s+in""".r, """(?i)between""".r, """(?i)not\s+between""".r)
-  def calculateOnly(values: Map[String, Any]): Option[Any] = {
-    val (lv, rvs) = (left.calculate(values), range.elements.map(_.calculate(values)))
-    rangeOpr match {
-      case this.inOpr() => lv in rvs
-      case this.ninOpr() => lv not_in rvs
-      case this.btwnOpr() => lv between rvs
-      case this.nbtwnOpr() => lv not_between rvs
-      case _ => None
-    }
-  }
-  val desc: String = s"${left.desc} ${rangeOpr} ${range.desc}"
-  val dataSources: Set[String] = left.dataSources ++ range.elements.flatMap(_.dataSources).toSet
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    left.getCacheExprs(ds) ++ range.elements.flatMap(_.getCacheExprs(ds))
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    left.getFinalCacheExprs(ds) ++ range.elements.flatMap(_.getFinalCacheExprs(ds))
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    left.getPersistExprs(ds) ++ range.elements.flatMap(_.getPersistExprs(ds))
-  }
-}
-
-// -- logical statement --
-//case class LogicalFactorExpr(self: LogicalExpr) extends LogicalExpr {
-//  def calculate(values: Map[String, Any]): Option[Any] = self.calculate(values)
-//  val desc: String = self.desc
-//}
-
-case class UnaryLogicalExpr(oprList: Iterable[String], factor: LogicalExpr) extends LogicalExpr {
-  private val notOpr = """(?i)not|!""".r
-  def calculateOnly(values: Map[String, Any]): Option[Any] = {
-    val fv = factor.calculate(values)
-    oprList.foldRight(fv) { (opr, v) =>
-      opr match {
-        case this.notOpr() => !v
-        case _ => None
-      }
-    }
-  }
-  val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev} ${ex}" }
-  val dataSources: Set[String] = factor.dataSources
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    factor.getCacheExprs(ds)
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    factor.getFinalCacheExprs(ds)
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    factor.getPersistExprs(ds)
-  }
-
-  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = {
-    val notOprList = oprList.filter { opr =>
-      opr match {
-        case this.notOpr() => true
-        case _ => false
-      }
-    }
-    if (notOprList.size % 2 == 0) factor.getGroupbyExprPairs(dsPair) else Nil
-  }
-}
-
-case class BinaryLogicalExpr(first: LogicalExpr, others: Iterable[(String, LogicalExpr)]) extends LogicalExpr {
-  private val (andOpr, orOpr) = ("""(?i)and|&&""".r, """(?i)or|\|\|""".r)
-  def calculateOnly(values: Map[String, Any]): Option[Any] = {
-    val fv = first.calculate(values)
-    others.foldLeft(fv) { (v, pair) =>
-      val (opr, next) = pair
-      val nv = next.calculate(values)
-      opr match {
-        case this.andOpr() => v && nv
-        case this.orOpr() => v || nv
-        case _ => None
-      }
-    }
-  }
-  val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" }
-  val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds))
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds))
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds))
-  }
-
-  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = {
-    if (others.isEmpty) first.getGroupbyExprPairs(dsPair)
-    else {
-      val isAnd = others.exists(_._1 match {
-        case this.andOpr() => true
-        case _ => false
-      })
-      if (isAnd) {
-        first.getGroupbyExprPairs(dsPair) ++ others.flatMap(_._2.getGroupbyExprPairs(dsPair))
-      } else Nil
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
deleted file mode 100644
index 2da4098..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-import org.apache.griffin.measure.batch.rule.CalculationUtil._
-
-trait MathExpr extends Expr {
-
-}
-
-case class MathFactorExpr(self: Expr) extends MathExpr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = self.calculate(values)
-  val desc: String = self.desc
-  val dataSources: Set[String] = self.dataSources
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    self.getCacheExprs(ds)
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    self.getFinalCacheExprs(ds)
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    self.getPersistExprs(ds)
-  }
-}
-
-case class UnaryMathExpr(oprList: Iterable[String], factor: Expr) extends MathExpr {
-  private val (posOpr, negOpr) = ("+", "-")
-  def calculateOnly(values: Map[String, Any]): Option[Any] = {
-    val fv = factor.calculate(values)
-    oprList.foldRight(fv) { (opr, v) =>
-      opr match {
-        case this.posOpr => v
-        case this.negOpr => -v
-        case _ => None
-      }
-    }
-  }
-  val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev}${ex}" }
-  val dataSources: Set[String] = factor.dataSources
-  override def cacheUnit: Boolean = true
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    factor.getCacheExprs(ds)
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    factor.getFinalCacheExprs(ds)
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    factor.getPersistExprs(ds)
-  }
-}
-
-case class BinaryMathExpr(first: MathExpr, others: Iterable[(String, MathExpr)]) extends MathExpr {
-  private val (addOpr, subOpr, mulOpr, divOpr, modOpr) = ("+", "-", "*", "/", "%")
-  def calculateOnly(values: Map[String, Any]): Option[Any] = {
-    val fv = first.calculate(values)
-    others.foldLeft(fv) { (v, pair) =>
-      val (opr, next) = pair
-      val nv = next.calculate(values)
-      opr match {
-        case this.addOpr => v + nv
-        case this.subOpr => v - nv
-        case this.mulOpr => v * nv
-        case this.divOpr => v / nv
-        case this.modOpr => v % nv
-        case _ => None
-      }
-    }
-  }
-  val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" }
-  val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet
-  override def cacheUnit: Boolean = true
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds))
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds))
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
deleted file mode 100644
index b648292..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-trait SelectExpr extends Expr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = None
-}
-
-case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends SelectExpr {
-  val desc: String = s"[${fields.map(_.desc).mkString(", ")}]"
-  val dataSources: Set[String] = Set.empty[String]
-}
-
-case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) extends SelectExpr {
-  val desc: String = s".${func}(${args.map(_.desc).mkString(", ")})"
-  val dataSources: Set[String] = args.flatMap(_.dataSources).toSet
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getCacheExprs(ds))
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getFinalCacheExprs(ds))
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = args.flatMap(_.getPersistExprs(ds))
-}
-
-case class FilterSelectExpr(field: FieldDesc, compare: String, value: MathExpr) extends SelectExpr {
-  val desc: String = s"[${field.desc} ${compare} ${value.desc}]"
-  val dataSources: Set[String] = value.dataSources
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = value.getCacheExprs(ds)
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = value.getFinalCacheExprs(ds)
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = value.getPersistExprs(ds)
-}
-
-// -- selection --
-case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) extends Expr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id)
-
-  val desc: String = {
-    val argsString = selectors.map(_.desc).mkString("")
-    s"${head.desc}${argsString}"
-  }
-  val dataSources: Set[String] = {
-    val selectorDataSources = selectors.flatMap(_.dataSources).toSet
-    selectorDataSources + head.head
-  }
-
-  override def cacheUnit: Boolean = true
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    selectors.flatMap(_.getCacheExprs(ds))
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    selectors.flatMap(_.getFinalCacheExprs(ds))
-  }
-
-  override def persistUnit: Boolean = true
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    selectors.flatMap(_.getPersistExprs(ds))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
deleted file mode 100644
index 4c1921b..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-package org.apache.griffin.measure.batch.rule.expr
-
-
-trait StatementExpr extends Expr with AnalyzableExpr {
-  def valid(values: Map[String, Any]): Boolean = true
-  override def cacheUnit: Boolean = true
-}
-
-case class SimpleStatementExpr(expr: LogicalExpr) extends StatementExpr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values)
-  val desc: String = expr.desc
-  val dataSources: Set[String] = expr.dataSources
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    expr.getCacheExprs(ds)
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    expr.getFinalCacheExprs(ds)
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    expr.getPersistExprs(ds)
-  }
-
-  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = expr.getGroupbyExprPairs(dsPair)
-}
-
-case class WhenClauseStatementExpr(expr: LogicalExpr, whenExpr: LogicalExpr) extends StatementExpr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values)
-  val desc: String = s"${expr.desc} when ${whenExpr.desc}"
-
-  override def valid(values: Map[String, Any]): Boolean = {
-    whenExpr.calculate(values) match {
-      case Some(r: Boolean) => r
-      case _ => false
-    }
-  }
-
-  val dataSources: Set[String] = expr.dataSources ++ whenExpr.dataSources
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    expr.getCacheExprs(ds) ++ whenExpr.getCacheExprs(ds)
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    expr.getFinalCacheExprs(ds) ++ whenExpr.getFinalCacheExprs(ds)
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    expr.getPersistExprs(ds) ++ whenExpr.getPersistExprs(ds)
-  }
-
-  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = {
-    expr.getGroupbyExprPairs(dsPair) ++ whenExpr.getGroupbyExprPairs(dsPair)
-  }
-  override def getWhenClauseExpr(): Option[LogicalExpr] = Some(whenExpr)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
deleted file mode 100644
index b48478a..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.griffin.measure.batch.utils
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
-
-object HdfsUtil {
-
-  private val seprator = "/"
-
-  private val conf = new Configuration()
-  conf.set("dfs.support.append", "true")
-
-  private val dfs = FileSystem.get(conf)
-
-  def existPath(filePath: String): Boolean = {
-    val path = new Path(filePath)
-    dfs.exists(path)
-  }
-
-  def createFile(filePath: String): FSDataOutputStream = {
-    val path = new Path(filePath)
-    if (dfs.exists(path)) dfs.delete(path, true)
-    return dfs.create(path)
-  }
-
-  def appendOrCreateFile(filePath: String): FSDataOutputStream = {
-    val path = new Path(filePath)
-    if (dfs.exists(path)) dfs.append(path) else createFile(filePath)
-  }
-
-  def openFile(filePath: String): FSDataInputStream = {
-    val path = new Path(filePath)
-    dfs.open(path)
-  }
-
-  def writeContent(filePath: String, message: String): Unit = {
-    val out = createFile(filePath)
-    out.write(message.getBytes("utf-8"))
-    out.close
-  }
-
-  def appendContent(filePath: String, message: String): Unit = {
-    val out = appendOrCreateFile(filePath)
-    out.write(message.getBytes("utf-8"))
-    out.close
-  }
-
-  def createEmptyFile(filePath: String): Unit = {
-    val out = createFile(filePath)
-    out.close
-  }
-
-
-  def getHdfsFilePath(parentPath: String, fileName: String): String = {
-    if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + seprator + fileName
-  }
-
-  def deleteHdfsPath(dirPath: String): Unit = {
-    val path = new Path(dirPath)
-    if (dfs.exists(path)) dfs.delete(path, true)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
deleted file mode 100644
index 747d0fa..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.griffin.measure.batch.utils
-
-import scalaj.http._
-
-object HttpUtil {
-
-  val GET_REGEX = """^(?i)get$""".r
-  val POST_REGEX = """^(?i)post$""".r
-  val PUT_REGEX = """^(?i)put$""".r
-  val DELETE_REGEX = """^(?i)delete$""".r
-
-  def postData(url: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = {
-    val response = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString
-    response.code.toString
-  }
-
-  def httpRequest(url: String, method: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = {
-    val httpReq = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers))
-    method match {
-      case POST_REGEX() => httpReq.postData(data).asString.code.toString
-      case PUT_REGEX() => httpReq.put(data).asString.code.toString
-      case _ => "wrong method"
-    }
-  }
-
-  private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, String] = {
-    map.map(pair => pair._1 -> pair._2.toString)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
deleted file mode 100644
index cdd470a..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.apache.griffin.measure.batch.utils
-
-import java.io.InputStream
-
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-
-import scala.reflect._
-
-object JsonUtil {
-  val mapper = new ObjectMapper()
-  mapper.registerModule(DefaultScalaModule)
-  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
-
-  def toJson(value: Map[Symbol, Any]): String = {
-    toJson(value map { case (k,v) => k.name -> v})
-  }
-
-  def toJson(value: Any): String = {
-    mapper.writeValueAsString(value)
-  }
-
-  def toMap[V](json:String)(implicit m: Manifest[V]) = fromJson[Map[String,V]](json)
-
-  def fromJson[T: ClassTag](json: String)(implicit m : Manifest[T]): T = {
-    mapper.readValue[T](json, classTag[T].runtimeClass.asInstanceOf[Class[T]])
-  }
-
-  def fromJson[T: ClassTag](is: InputStream)(implicit m : Manifest[T]): T = {
-    mapper.readValue[T](is, classTag[T].runtimeClass.asInstanceOf[Class[T]])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/resources/config-profile.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/config-profile.json b/measure/measure-batch/src/test/resources/config-profile.json
deleted file mode 100644
index 6b82d7f..0000000
--- a/measure/measure-batch/src/test/resources/config-profile.json
+++ /dev/null
@@ -1,17 +0,0 @@
-{
-  "name": "prof1",
-  "type": "profile",
-
-  "source": {
-    "type": "avro",
-    "version": "1.7",
-    "config": {
-      "file.name": "src/test/resources/users_info_src.avro"
-    }
-  },
-
-  "evaluateRule": {
-    "sampleRatio": 1,
-    "rules": "$source.post_code == null"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/config.json b/measure/measure-batch/src/test/resources/config.json
deleted file mode 100644
index 65e0ed9..0000000
--- a/measure/measure-batch/src/test/resources/config.json
+++ /dev/null
@@ -1,25 +0,0 @@
-{
-  "name": "accu1",
-  "type": "accuracy",
-
-  "source": {
-    "type": "avro",
-    "version": "1.7",
-    "config": {
-      "file.name": "src/test/resources/users_info_src.avro"
-    }
-  },
-
-  "target": {
-    "type": "avro",
-    "version": "1.7",
-    "config": {
-      "file.name": "src/test/resources/users_info_target.avro"
-    }
-  },
-
-  "evaluateRule": {
-    "sampleRatio": 1,
-    "rules": "$source.user_id > 10020 AND $source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/resources/config1.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/config1.json b/measure/measure-batch/src/test/resources/config1.json
deleted file mode 100644
index d7290ba..0000000
--- a/measure/measure-batch/src/test/resources/config1.json
+++ /dev/null
@@ -1,27 +0,0 @@
-{
-  "name": "accu-test",
-  "type": "accuracy",
-
-  "source": {
-    "type": "hive",
-    "version": "1.2",
-    "config": {
-      "table.name": "rheos_view_event",
-      "partitions": "dt=20170410, hour=15"
-    }
-  },
-
-  "target": {
-    "type": "hive",
-    "version": "1.2",
-    "config": {
-      "table.name": "be_view_event_queue",
-      "partitions": "dt=20170410, hour=15; dt=20170410, hour=16"
-    }
-  },
-
-  "evaluateRule": {
-    "sampleRatio": 1,
-    "rules": "@Key ${source}['uid'] === ${target}['uid']; @Key ${source}['eventtimestamp'] === ${target}['eventtimestamp']; ${source}['page_id'] === ${target}['page_id']; ${source}['site_id'] === ${target}['site_id']; ${source}['itm'] === ${target}['itm']"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/resources/env.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/env.json b/measure/measure-batch/src/test/resources/env.json
deleted file mode 100644
index 3a9e38c..0000000
--- a/measure/measure-batch/src/test/resources/env.json
+++ /dev/null
@@ -1,27 +0,0 @@
-{
-  "spark": {
-    "log.level": "ERROR",
-    "checkpoint.dir": "hdfs:///griffin/batch/cp",
-    "config": {}
-  },
-
-  "persist": [
-    {
-      "type": "hdfs",
-      "config": {
-        "path": "hdfs:///griffin/streaming/persist"
-      }
-    },
-    {
-      "type": "http",
-      "config": {
-        "method": "post",
-        "api": "http://phxbark4dq-360935.stratus.phx.ebay.com:8080/"
-      }
-    }
-  ],
-
-  "cleaner": {
-
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/resources/env1.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/env1.json b/measure/measure-batch/src/test/resources/env1.json
deleted file mode 100644
index a059715..0000000
--- a/measure/measure-batch/src/test/resources/env1.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
-  "spark": {
-    "log.level": "INFO",
-    "checkpoint.dir": "hdfs:///griffin/batch/cp",
-    "config": {}
-  },
-
-  "persist": [
-    {
-      "type": "hdfs",
-      "config": {
-        "path": "hdfs:///user/b_des/bark/griffin-batch/test",
-        "max.lines.per.file": 10000
-      }
-    }
-  ],
-
-  "cleaner": {
-
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/log4j.properties b/measure/measure-batch/src/test/resources/log4j.properties
deleted file mode 100644
index bd31e15..0000000
--- a/measure/measure-batch/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-log4j.rootLogger=INFO, stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/resources/users_info_src.avro
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/users_info_src.avro b/measure/measure-batch/src/test/resources/users_info_src.avro
deleted file mode 100644
index 3d5c939..0000000
Binary files a/measure/measure-batch/src/test/resources/users_info_src.avro and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/resources/users_info_src.dat
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/users_info_src.dat b/measure/measure-batch/src/test/resources/users_info_src.dat
deleted file mode 100644
index ce49443..0000000
--- a/measure/measure-batch/src/test/resources/users_info_src.dat
+++ /dev/null
@@ -1,50 +0,0 @@
-10001|Tom001|Jerrya|201 DisneyCity|tomajerrya001@dc.org|10000001|94022
-10002|Tom002|Jerrya|202 DisneyCity|tomajerrya002@dc.org|10000002|94022
-10003|Tom003|Jerrya|203 DisneyCity|tomajerrya003@dc.org|10000003|94022
-10004|Tom004|Jerrya|204 DisneyCity|tomajerrya004@dc.org|10000004|94022
-10005|Tom005|Jerrya|205 DisneyCity|tomajerrya005@dc.org|10000005|94022
-10006|Tom006|Jerrya|206 DisneyCity|tomajerrya006@dc.org|10000006|94022
-10007|Tom007|Jerrya|207 DisneyCity|tomajerrya007@dc.org|10000007|94022
-10008|Tom008|Jerrya|208 DisneyCity|tomajerrya008@dc.org|10000008|94022
-10009|Tom009|Jerrya|209 DisneyCity|tomajerrya009@dc.org|10000009|94022
-10010|Tom010|Jerrya|210 DisneyCity|tomajerrya010@dc.org|10000010|94022
-10011|Tom011|Jerrya|211 DisneyCity|tomajerrya011@dc.org|10000011|94022
-10012|Tom012|Jerrya|212 DisneyCity|tomajerrya012@dc.org|10000012|94022
-10013|Tom013|Jerrya|213 DisneyCity|tomajerrya013@dc.org|10000013|94022
-10014|Tom014|Jerrya|214 DisneyCity|tomajerrya014@dc.org|10000014|94022
-10015|Tom015|Jerrya|215 DisneyCity|tomajerrya015@dc.org|10000015|94022
-10016|Tom016|Jerrya|216 DisneyCity|tomajerrya016@dc.org|10000016|94022
-10017|Tom017|Jerrya|217 DisneyCity|tomajerrya017@dc.org|10000017|94022
-10018|Tom018|Jerrya|218 DisneyCity|tomajerrya018@dc.org|10000018|94022
-10019|Tom019|Jerrya|219 DisneyCity|tomajerrya019@dc.org|10000019|94022
-10020|Tom020|Jerrya|220 DisneyCity|tomajerrya020@dc.org|10000020|94022
-10021|Tom021|Jerrya|221 DisneyCity|tomajerrya021@dc.org|10000021|94022
-10022|Tom022|Jerrya|222 DisneyCity|tomajerrya022@dc.org|10000022|94022
-10023|Tom023|Jerrya|223 DisneyCity|tomajerrya023@dc.org|10000023|94022
-10024|Tom024|Jerrya|224 DisneyCity|tomajerrya024@dc.org|10000024|94022
-10025|Tom025|Jerrya|225 DisneyCity|tomajerrya025@dc.org|10000025|94022
-10026|Tom026|Jerrya|226 DisneyCity|tomajerrya026@dc.org|10000026|94022
-10027|Tom027|Jerrya|227 DisneyCity|tomajerrya027@dc.org|10000027|94022
-10028|Tom028|Jerrya|228 DisneyCity|tomajerrya028@dc.org|10000028|94022
-10029|Tom029|Jerrya|229 DisneyCity|tomajerrya029@dc.org|10000029|94022
-10030|Tom030|Jerrya|230 DisneyCity|tomajerrya030@dc.org|10000030|94022
-10031|Tom031|Jerrya|231 DisneyCity|tomajerrya031@dc.org|10000031|94022
-10032|Tom032|Jerrya|232 DisneyCity|tomajerrya032@dc.org|10000032|94022
-10033|Tom033|Jerrya|233 DisneyCity|tomajerrya033@dc.org|10000033|94022
-10034|Tom034|Jerrya|234 DisneyCity|tomajerrya034@dc.org|10000034|94022
-10035|Tom035|Jerrya|235 DisneyCity|tomajerrya035@dc.org|10000035|94022
-10036|Tom036|Jerrya|236 DisneyCity|tomajerrya036@dc.org|10000036|94022
-10037|Tom037|Jerrya|237 DisneyCity|tomajerrya037@dc.org|10000037|94022
-10038|Tom038|Jerrya|238 DisneyCity|tomajerrya038@dc.org|10000038|94022
-10039|Tom039|Jerrya|239 DisneyCity|tomajerrya039@dc.org|10000039|94022
-10040|Tom040|Jerrya|240 DisneyCity|tomajerrya040@dc.org|10000040|94022
-10041|Tom041|Jerrya|241 DisneyCity|tomajerrya041@dc.org|10000041|94022
-10042|Tom042|Jerrya|242 DisneyCity|tomajerrya042@dc.org|10000042|94022
-10043|Tom043|Jerrya|243 DisneyCity|tomajerrya043@dc.org|10000043|94022
-10044|Tom044|Jerrya|244 DisneyCity|tomajerrya044@dc.org|10000044|94022
-10045|Tom045|Jerrya|245 DisneyCity|tomajerrya045@dc.org|10000045|94022
-10046|Tom046|Jerrya|246 DisneyCity|tomajerrya046@dc.org|10000046|94022
-10047|Tom047|Jerrya|247 DisneyCity|tomajerrya047@dc.org|10000047|94022
-10048|Tom048|Jerrya|248 DisneyCity|tomajerrya048@dc.org|10000048|94022
-10049|Tom049|Jerrya|249 DisneyCity|tomajerrya049@dc.org|10000049|94022
-10050|Tom050|Jerrya|250 DisneyCity|tomajerrya050@dc.org|10000050|94022
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/test/resources/users_info_target.avro
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/test/resources/users_info_target.avro b/measure/measure-batch/src/test/resources/users_info_target.avro
deleted file mode 100644
index 104dd6c..0000000
Binary files a/measure/measure-batch/src/test/resources/users_info_target.avro and /dev/null differ