You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/06/02 09:31:51 UTC

[2/6] incubator-griffin git commit: griffin-measure package modification

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
new file mode 100644
index 0000000..d824506
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
@@ -0,0 +1,21 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+
+trait AnalyzableExpr extends Serializable {
+  def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = Nil
+  def getWhenClauseExpr(): Option[LogicalExpr] = None
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
new file mode 100644
index 0000000..776a22d
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
@@ -0,0 +1,29 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+trait Cacheable extends DataSourceable {
+  protected def cacheUnit: Boolean = false
+  def cacheable(ds: String): Boolean = {
+    cacheUnit && !conflict() && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && contains(ds)))
+  }
+  protected def getCacheExprs(ds: String): Iterable[Cacheable]
+
+  protected def persistUnit: Boolean = false
+  def persistable(ds: String): Boolean = {
+    persistUnit && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && contains(ds)))
+  }
+  protected def getPersistExprs(ds: String): Iterable[Cacheable]
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
new file mode 100644
index 0000000..1bbb123
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
@@ -0,0 +1,21 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+trait Calculatable extends Serializable {
+
+  def calculate(values: Map[String, Any]): Option[Any]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
new file mode 100644
index 0000000..c517e13
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
@@ -0,0 +1,24 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+trait DataSourceable extends Serializable {
+  val dataSources: Set[String]
+  protected def conflict(): Boolean = dataSources.size > 1
+  def contains(ds: String): Boolean = dataSources.contains(ds)
+  def dataSourceOpt: Option[String] = {
+    if (dataSources.size == 1) Some(dataSources.head) else None
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
new file mode 100644
index 0000000..5c91ecd
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
@@ -0,0 +1,29 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+trait Describable extends Serializable {
+
+  val desc: String
+
+  protected def describe(v: Any): String = {
+    v match {
+      case s: Describable => s"${s.desc}"
+      case s: String => s"'${s}'"
+      case a => s"${a}"
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
new file mode 100644
index 0000000..140c6d7
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
@@ -0,0 +1,47 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+trait Expr extends Serializable with Describable with Cacheable with Calculatable {
+
+  protected val _defaultId: String = ExprIdCounter.emptyId
+
+  val _id = ExprIdCounter.genId(_defaultId)
+
+  protected def getSubCacheExprs(ds: String): Iterable[Expr] = Nil
+  final def getCacheExprs(ds: String): Iterable[Expr] = {
+    if (cacheable(ds)) getSubCacheExprs(ds).toList :+ this else getSubCacheExprs(ds)
+  }
+
+  protected def getSubFinalCacheExprs(ds: String): Iterable[Expr] = Nil
+  final def getFinalCacheExprs(ds: String): Iterable[Expr] = {
+    if (cacheable(ds)) Nil :+ this else getSubFinalCacheExprs(ds)
+  }
+
+  protected def getSubPersistExprs(ds: String): Iterable[Expr] = Nil
+  final def getPersistExprs(ds: String): Iterable[Expr] = {
+    if (persistable(ds)) getSubPersistExprs(ds).toList :+ this else getSubPersistExprs(ds)
+  }
+
+  final def calculate(values: Map[String, Any]): Option[Any] = {
+    values.get(_id) match {
+      case Some(v) => Some(v)
+      case _ => calculateOnly(values)
+    }
+  }
+  protected def calculateOnly(values: Map[String, Any]): Option[Any]
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
new file mode 100644
index 0000000..b830bb9
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
@@ -0,0 +1,36 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+trait ExprDescOnly extends Describable {
+
+}
+
+
+case class SelectionHead(expr: String) extends ExprDescOnly {
+  private val headRegex = """\$(\w+)""".r
+  val head: String = expr match {
+    case headRegex(v) => v.toLowerCase
+    case _ => expr
+  }
+  val desc: String = "$" + head
+}
+
+case class RangeDesc(elements: Iterable[MathExpr]) extends ExprDescOnly {
+  val desc: String = {
+    val rangeDesc = elements.map(_.desc).mkString(", ")
+    s"(${rangeDesc})"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
new file mode 100644
index 0000000..2eb4475
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
@@ -0,0 +1,56 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.{Set => MutableSet}
+
+object ExprIdCounter {
+
+  private val idCounter: AtomicLong = new AtomicLong(0L)
+
+  private val existIdSet: MutableSet[String] = MutableSet.empty[String]
+
+  private val invalidIdRegex = """^\d+$""".r
+
+  val emptyId: String = ""
+
+  def genId(defaultId: String): String = {
+    defaultId match {
+      case emptyId => increment.toString
+      case invalidIdRegex() => increment.toString
+//      case defId if (exist(defId)) => s"${increment}#${defId}"
+      case defId if (exist(defId)) => s"${defId}"
+      case _ => {
+        insertUserId(defaultId)
+        defaultId
+      }
+    }
+  }
+
+  private def exist(id: String): Boolean = {
+    existIdSet.contains(id)
+  }
+
+  private def insertUserId(id: String): Unit = {
+    existIdSet += id
+  }
+
+  private def increment(): Long = {
+    idCounter.incrementAndGet()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
new file mode 100644
index 0000000..d28f54f
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
@@ -0,0 +1,54 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+import scala.util.{Success, Try}
+
+trait FieldDescOnly extends Describable with DataSourceable {
+
+}
+
+case class IndexDesc(expr: String) extends FieldDescOnly {
+  val index: Int = {
+    Try(expr.toInt) match {
+      case Success(v) => v
+      case _ => throw new Exception(s"${expr} is invalid index")
+    }
+  }
+  val desc: String = describe(index)
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class FieldDesc(expr: String) extends FieldDescOnly {
+  val field: String = expr
+  val desc: String = describe(field)
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class AllFieldsDesc(expr: String) extends FieldDescOnly {
+  val allFields: String = expr
+  val desc: String = allFields
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class FieldRangeDesc(startField: FieldDescOnly, endField: FieldDescOnly) extends FieldDescOnly {
+  val desc: String = {
+    (startField, endField) match {
+      case (f1: IndexDesc, f2: IndexDesc) => s"(${f1.desc}, ${f2.desc})"
+      case _ => throw new Exception("invalid field range description")
+    }
+  }
+  val dataSources: Set[String] = Set.empty[String]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
new file mode 100644
index 0000000..2d004da
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
@@ -0,0 +1,92 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+import scala.util.{Failure, Success, Try}
+
+trait LiteralExpr extends Expr {
+  val value: Option[Any]
+  def calculateOnly(values: Map[String, Any]): Option[Any] = value
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class LiteralStringExpr(expr: String) extends LiteralExpr {
+  val value: Option[String] = Some(expr)
+  val desc: String = s"'${value.getOrElse("")}'"
+}
+
+case class LiteralNumberExpr(expr: String) extends LiteralExpr {
+  val value: Option[Any] = {
+    if (expr.contains(".")) {
+      Try (expr.toDouble) match {
+        case Success(v) => Some(v)
+        case _ => throw new Exception(s"${expr} is invalid number")
+      }
+    } else {
+      Try (expr.toLong) match {
+        case Success(v) => Some(v)
+        case _ => throw new Exception(s"${expr} is invalid number")
+      }
+    }
+  }
+  val desc: String = value.getOrElse("").toString
+}
+
+case class LiteralTimeExpr(expr: String) extends LiteralExpr {
+  final val TimeRegex = """(\d+)(d|h|m|s|ms)""".r
+  val value: Option[Long] = {
+    Try {
+      expr match {
+        case TimeRegex(time, unit) => {
+          val t = time.toLong
+          unit match {
+            case "d" => t * 24 * 60 * 60 * 1000
+            case "h" => t * 60 * 60 * 1000
+            case "m" => t * 60 * 1000
+            case "s" => t * 1000
+            case "ms" => t
+            case _ => throw new Exception(s"${expr} is invalid time format")
+          }
+        }
+        case _ => throw new Exception(s"${expr} is invalid time format")
+      }
+    } match {
+      case Success(v) => Some(v)
+      case Failure(ex) => throw ex
+    }
+  }
+  val desc: String = expr
+}
+
+case class LiteralBooleanExpr(expr: String) extends LiteralExpr {
+  final val TrueRegex = """(?i)true""".r
+  final val FalseRegex = """(?i)false""".r
+  val value: Option[Boolean] = expr match {
+    case TrueRegex() => Some(true)
+    case FalseRegex() => Some(false)
+    case _ => throw new Exception(s"${expr} is invalid boolean")
+  }
+  val desc: String = value.getOrElse("").toString
+}
+
+case class LiteralNullExpr(expr: String) extends LiteralExpr {
+  val value: Option[Any] = Some(null)
+  val desc: String = "null"
+}
+
+case class LiteralNoneExpr(expr: String) extends LiteralExpr {
+  val value: Option[Any] = None
+  val desc: String = "none"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
new file mode 100644
index 0000000..d6ce357
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
@@ -0,0 +1,173 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+import org.apache.griffin.measure.batch.rule.CalculationUtil._
+
+trait LogicalExpr extends Expr with AnalyzableExpr {
+  override def cacheUnit: Boolean = true
+}
+
+case class LogicalSimpleExpr(expr: MathExpr) extends LogicalExpr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values)
+  val desc: String = expr.desc
+  val dataSources: Set[String] = expr.dataSources
+  override def cacheUnit: Boolean = false
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = expr.getCacheExprs(ds)
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = expr.getFinalCacheExprs(ds)
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = expr.getPersistExprs(ds)
+}
+
+case class LogicalCompareExpr(left: MathExpr, compare: String, right: MathExpr) extends LogicalExpr {
+  private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, """!==?""".r, ">", ">=", "<", "<=")
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val (lv, rv) = (left.calculate(values), right.calculate(values))
+    compare match {
+      case this.eqOpr() => lv === rv
+      case this.neqOpr() => lv =!= rv
+      case this.btOpr => lv > rv
+      case this.bteOpr => lv >= rv
+      case this.ltOpr => lv < rv
+      case this.lteOpr => lv <= rv
+      case _ => None
+    }
+  }
+  val desc: String = s"${left.desc} ${compare} ${right.desc}"
+  val dataSources: Set[String] = left.dataSources ++ right.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    left.getCacheExprs(ds) ++ right.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    left.getFinalCacheExprs(ds) ++ right.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    left.getPersistExprs(ds) ++ right.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = {
+    if (compare == "=" || compare == "==") {
+      (left.dataSourceOpt, right.dataSourceOpt) match {
+        case (Some(dsPair._1), Some(dsPair._2)) => (left, right) :: Nil
+        case (Some(dsPair._2), Some(dsPair._1)) => (right, left) :: Nil
+        case _ => Nil
+      }
+    } else Nil
+  }
+}
+
+case class LogicalRangeExpr(left: MathExpr, rangeOpr: String, range: RangeDesc) extends LogicalExpr {
+  private val (inOpr, ninOpr, btwnOpr, nbtwnOpr) = ("""(?i)in""".r, """(?i)not\s+in""".r, """(?i)between""".r, """(?i)not\s+between""".r)
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val (lv, rvs) = (left.calculate(values), range.elements.map(_.calculate(values)))
+    rangeOpr match {
+      case this.inOpr() => lv in rvs
+      case this.ninOpr() => lv not_in rvs
+      case this.btwnOpr() => lv between rvs
+      case this.nbtwnOpr() => lv not_between rvs
+      case _ => None
+    }
+  }
+  val desc: String = s"${left.desc} ${rangeOpr} ${range.desc}"
+  val dataSources: Set[String] = left.dataSources ++ range.elements.flatMap(_.dataSources).toSet
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    left.getCacheExprs(ds) ++ range.elements.flatMap(_.getCacheExprs(ds))
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    left.getFinalCacheExprs(ds) ++ range.elements.flatMap(_.getFinalCacheExprs(ds))
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    left.getPersistExprs(ds) ++ range.elements.flatMap(_.getPersistExprs(ds))
+  }
+}
+
+// -- logical statement --
+//case class LogicalFactorExpr(self: LogicalExpr) extends LogicalExpr {
+//  def calculate(values: Map[String, Any]): Option[Any] = self.calculate(values)
+//  val desc: String = self.desc
+//}
+
+case class UnaryLogicalExpr(oprList: Iterable[String], factor: LogicalExpr) extends LogicalExpr {
+  private val notOpr = """(?i)not|!""".r
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val fv = factor.calculate(values)
+    oprList.foldRight(fv) { (opr, v) =>
+      opr match {
+        case this.notOpr() => !v
+        case _ => None
+      }
+    }
+  }
+  val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev} ${ex}" }
+  val dataSources: Set[String] = factor.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    factor.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    factor.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    factor.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = {
+    val notOprList = oprList.filter { opr =>
+      opr match {
+        case this.notOpr() => true
+        case _ => false
+      }
+    }
+    if (notOprList.size % 2 == 0) factor.getGroupbyExprPairs(dsPair) else Nil
+  }
+}
+
+case class BinaryLogicalExpr(first: LogicalExpr, others: Iterable[(String, LogicalExpr)]) extends LogicalExpr {
+  private val (andOpr, orOpr) = ("""(?i)and|&&""".r, """(?i)or|\|\|""".r)
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val fv = first.calculate(values)
+    others.foldLeft(fv) { (v, pair) =>
+      val (opr, next) = pair
+      val nv = next.calculate(values)
+      opr match {
+        case this.andOpr() => v && nv
+        case this.orOpr() => v || nv
+        case _ => None
+      }
+    }
+  }
+  val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" }
+  val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds))
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds))
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds))
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = {
+    if (others.isEmpty) first.getGroupbyExprPairs(dsPair)
+    else {
+      val isAnd = others.exists(_._1 match {
+        case this.andOpr() => true
+        case _ => false
+      })
+      if (isAnd) {
+        first.getGroupbyExprPairs(dsPair) ++ others.flatMap(_._2.getGroupbyExprPairs(dsPair))
+      } else Nil
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
new file mode 100644
index 0000000..3d584dc
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
@@ -0,0 +1,93 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+import org.apache.griffin.measure.batch.rule.CalculationUtil._
+
+trait MathExpr extends Expr {
+
+}
+
+case class MathFactorExpr(self: Expr) extends MathExpr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = self.calculate(values)
+  val desc: String = self.desc
+  val dataSources: Set[String] = self.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    self.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    self.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    self.getPersistExprs(ds)
+  }
+}
+
+case class UnaryMathExpr(oprList: Iterable[String], factor: Expr) extends MathExpr {
+  private val (posOpr, negOpr) = ("+", "-")
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val fv = factor.calculate(values)
+    oprList.foldRight(fv) { (opr, v) =>
+      opr match {
+        case this.posOpr => v
+        case this.negOpr => -v
+        case _ => None
+      }
+    }
+  }
+  val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev}${ex}" }
+  val dataSources: Set[String] = factor.dataSources
+  override def cacheUnit: Boolean = true
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    factor.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    factor.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    factor.getPersistExprs(ds)
+  }
+}
+
+case class BinaryMathExpr(first: MathExpr, others: Iterable[(String, MathExpr)]) extends MathExpr {
+  private val (addOpr, subOpr, mulOpr, divOpr, modOpr) = ("+", "-", "*", "/", "%")
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val fv = first.calculate(values)
+    others.foldLeft(fv) { (v, pair) =>
+      val (opr, next) = pair
+      val nv = next.calculate(values)
+      opr match {
+        case this.addOpr => v + nv
+        case this.subOpr => v - nv
+        case this.mulOpr => v * nv
+        case this.divOpr => v / nv
+        case this.modOpr => v % nv
+        case _ => None
+      }
+    }
+  }
+  val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" }
+  val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet
+  override def cacheUnit: Boolean = true
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds))
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds))
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
new file mode 100644
index 0000000..a25c9e2
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
@@ -0,0 +1,67 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+trait SelectExpr extends Expr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = None
+}
+
+case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends SelectExpr {
+  val desc: String = s"[${fields.map(_.desc).mkString(", ")}]"
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) extends SelectExpr {
+  val desc: String = s".${func}(${args.map(_.desc).mkString(", ")})"
+  val dataSources: Set[String] = args.flatMap(_.dataSources).toSet
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getCacheExprs(ds))
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getFinalCacheExprs(ds))
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = args.flatMap(_.getPersistExprs(ds))
+}
+
+case class FilterSelectExpr(field: FieldDesc, compare: String, value: MathExpr) extends SelectExpr {
+  val desc: String = s"[${field.desc} ${compare} ${value.desc}]"
+  val dataSources: Set[String] = value.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = value.getCacheExprs(ds)
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = value.getFinalCacheExprs(ds)
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = value.getPersistExprs(ds)
+}
+
+// -- selection --
+case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) extends Expr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id)
+
+  val desc: String = {
+    val argsString = selectors.map(_.desc).mkString("")
+    s"${head.desc}${argsString}"
+  }
+  val dataSources: Set[String] = {
+    val selectorDataSources = selectors.flatMap(_.dataSources).toSet
+    selectorDataSources + head.head
+  }
+
+  override def cacheUnit: Boolean = true
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    selectors.flatMap(_.getCacheExprs(ds))
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    selectors.flatMap(_.getFinalCacheExprs(ds))
+  }
+
+  override def persistUnit: Boolean = true
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    selectors.flatMap(_.getPersistExprs(ds))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
new file mode 100644
index 0000000..868d3bf
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
@@ -0,0 +1,66 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule.expr
+
+
+trait StatementExpr extends Expr with AnalyzableExpr {
+  def valid(values: Map[String, Any]): Boolean = true
+  override def cacheUnit: Boolean = true
+}
+
+case class SimpleStatementExpr(expr: LogicalExpr) extends StatementExpr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values)
+  val desc: String = expr.desc
+  val dataSources: Set[String] = expr.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    expr.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    expr.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    expr.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = expr.getGroupbyExprPairs(dsPair)
+}
+
+case class WhenClauseStatementExpr(expr: LogicalExpr, whenExpr: LogicalExpr) extends StatementExpr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values)
+  val desc: String = s"${expr.desc} when ${whenExpr.desc}"
+
+  override def valid(values: Map[String, Any]): Boolean = {
+    whenExpr.calculate(values) match {
+      case Some(r: Boolean) => r
+      case _ => false
+    }
+  }
+
+  val dataSources: Set[String] = expr.dataSources ++ whenExpr.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    expr.getCacheExprs(ds) ++ whenExpr.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    expr.getFinalCacheExprs(ds) ++ whenExpr.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    expr.getPersistExprs(ds) ++ whenExpr.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = {
+    expr.getGroupbyExprPairs(dsPair) ++ whenExpr.getGroupbyExprPairs(dsPair)
+  }
+  override def getWhenClauseExpr(): Option[LogicalExpr] = Some(whenExpr)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
new file mode 100644
index 0000000..27531e8
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
@@ -0,0 +1,76 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
+
+object HdfsUtil {
+
+  private val seprator = "/"
+
+  private val conf = new Configuration()
+  conf.set("dfs.support.append", "true")
+
+  private val dfs = FileSystem.get(conf)
+
+  def existPath(filePath: String): Boolean = {
+    val path = new Path(filePath)
+    dfs.exists(path)
+  }
+
+  def createFile(filePath: String): FSDataOutputStream = {
+    val path = new Path(filePath)
+    if (dfs.exists(path)) dfs.delete(path, true)
+    return dfs.create(path)
+  }
+
+  def appendOrCreateFile(filePath: String): FSDataOutputStream = {
+    val path = new Path(filePath)
+    if (dfs.exists(path)) dfs.append(path) else createFile(filePath)
+  }
+
+  def openFile(filePath: String): FSDataInputStream = {
+    val path = new Path(filePath)
+    dfs.open(path)
+  }
+
+  def writeContent(filePath: String, message: String): Unit = {
+    val out = createFile(filePath)
+    out.write(message.getBytes("utf-8"))
+    out.close
+  }
+
+  def appendContent(filePath: String, message: String): Unit = {
+    val out = appendOrCreateFile(filePath)
+    out.write(message.getBytes("utf-8"))
+    out.close
+  }
+
+  def createEmptyFile(filePath: String): Unit = {
+    val out = createFile(filePath)
+    out.close
+  }
+
+
+  def getHdfsFilePath(parentPath: String, fileName: String): String = {
+    if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + seprator + fileName
+  }
+
+  def deleteHdfsPath(dirPath: String): Unit = {
+    val path = new Path(dirPath)
+    if (dfs.exists(path)) dfs.delete(path, true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
new file mode 100644
index 0000000..e0be422
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
@@ -0,0 +1,44 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.utils
+
+import scalaj.http._
+
+object HttpUtil {
+
+  val GET_REGEX = """^(?i)get$""".r
+  val POST_REGEX = """^(?i)post$""".r
+  val PUT_REGEX = """^(?i)put$""".r
+  val DELETE_REGEX = """^(?i)delete$""".r
+
+  def postData(url: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = {
+    val response = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString
+    response.code.toString
+  }
+
+  def httpRequest(url: String, method: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = {
+    val httpReq = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers))
+    method match {
+      case POST_REGEX() => httpReq.postData(data).asString.code.toString
+      case PUT_REGEX() => httpReq.put(data).asString.code.toString
+      case _ => "wrong method"
+    }
+  }
+
+  private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, String] = {
+    map.map(pair => pair._1 -> pair._2.toString)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
new file mode 100644
index 0000000..d3a0697
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
@@ -0,0 +1,46 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.utils
+
+import java.io.InputStream
+
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import scala.reflect._
+
+object JsonUtil {
+  val mapper = new ObjectMapper()
+  mapper.registerModule(DefaultScalaModule)
+  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+
+  def toJson(value: Map[Symbol, Any]): String = {
+    toJson(value map { case (k,v) => k.name -> v})
+  }
+
+  def toJson(value: Any): String = {
+    mapper.writeValueAsString(value)
+  }
+
+  def toMap[V](json:String)(implicit m: Manifest[V]) = fromJson[Map[String,V]](json)
+
+  def fromJson[T: ClassTag](json: String)(implicit m : Manifest[T]): T = {
+    mapper.readValue[T](json, classTag[T].runtimeClass.asInstanceOf[Class[T]])
+  }
+
+  def fromJson[T: ClassTag](is: InputStream)(implicit m : Manifest[T]): T = {
+    mapper.readValue[T](is, classTag[T].runtimeClass.asInstanceOf[Class[T]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/resources/config-profile.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-profile.json b/measure/src/test/resources/config-profile.json
new file mode 100644
index 0000000..6b82d7f
--- /dev/null
+++ b/measure/src/test/resources/config-profile.json
@@ -0,0 +1,17 @@
+{
+  "name": "prof1",
+  "type": "profile",
+
+  "source": {
+    "type": "avro",
+    "version": "1.7",
+    "config": {
+      "file.name": "src/test/resources/users_info_src.avro"
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 1,
+    "rules": "$source.post_code == null"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config.json b/measure/src/test/resources/config.json
new file mode 100644
index 0000000..65e0ed9
--- /dev/null
+++ b/measure/src/test/resources/config.json
@@ -0,0 +1,25 @@
+{
+  "name": "accu1",
+  "type": "accuracy",
+
+  "source": {
+    "type": "avro",
+    "version": "1.7",
+    "config": {
+      "file.name": "src/test/resources/users_info_src.avro"
+    }
+  },
+
+  "target": {
+    "type": "avro",
+    "version": "1.7",
+    "config": {
+      "file.name": "src/test/resources/users_info_target.avro"
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 1,
+    "rules": "$source.user_id > 10020 AND $source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/resources/config1.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config1.json b/measure/src/test/resources/config1.json
new file mode 100644
index 0000000..d7290ba
--- /dev/null
+++ b/measure/src/test/resources/config1.json
@@ -0,0 +1,27 @@
+{
+  "name": "accu-test",
+  "type": "accuracy",
+
+  "source": {
+    "type": "hive",
+    "version": "1.2",
+    "config": {
+      "table.name": "rheos_view_event",
+      "partitions": "dt=20170410, hour=15"
+    }
+  },
+
+  "target": {
+    "type": "hive",
+    "version": "1.2",
+    "config": {
+      "table.name": "be_view_event_queue",
+      "partitions": "dt=20170410, hour=15; dt=20170410, hour=16"
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 1,
+    "rules": "@Key ${source}['uid'] === ${target}['uid']; @Key ${source}['eventtimestamp'] === ${target}['eventtimestamp']; ${source}['page_id'] === ${target}['page_id']; ${source}['site_id'] === ${target}['site_id']; ${source}['itm'] === ${target}['itm']"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/resources/env.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env.json b/measure/src/test/resources/env.json
new file mode 100644
index 0000000..3a9e38c
--- /dev/null
+++ b/measure/src/test/resources/env.json
@@ -0,0 +1,27 @@
+{
+  "spark": {
+    "log.level": "ERROR",
+    "checkpoint.dir": "hdfs:///griffin/batch/cp",
+    "config": {}
+  },
+
+  "persist": [
+    {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs:///griffin/streaming/persist"
+      }
+    },
+    {
+      "type": "http",
+      "config": {
+        "method": "post",
+        "api": "http://phxbark4dq-360935.stratus.phx.ebay.com:8080/"
+      }
+    }
+  ],
+
+  "cleaner": {
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/resources/env1.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env1.json b/measure/src/test/resources/env1.json
new file mode 100644
index 0000000..a059715
--- /dev/null
+++ b/measure/src/test/resources/env1.json
@@ -0,0 +1,21 @@
+{
+  "spark": {
+    "log.level": "INFO",
+    "checkpoint.dir": "hdfs:///griffin/batch/cp",
+    "config": {}
+  },
+
+  "persist": [
+    {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs:///user/b_des/bark/griffin-batch/test",
+        "max.lines.per.file": 10000
+      }
+    }
+  ],
+
+  "cleaner": {
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/log4j.properties b/measure/src/test/resources/log4j.properties
new file mode 100644
index 0000000..bd31e15
--- /dev/null
+++ b/measure/src/test/resources/log4j.properties
@@ -0,0 +1,5 @@
+log4j.rootLogger=INFO, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/resources/users_info_src.avro
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/users_info_src.avro b/measure/src/test/resources/users_info_src.avro
new file mode 100644
index 0000000..3d5c939
Binary files /dev/null and b/measure/src/test/resources/users_info_src.avro differ

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/resources/users_info_src.dat
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/users_info_src.dat b/measure/src/test/resources/users_info_src.dat
new file mode 100644
index 0000000..ce49443
--- /dev/null
+++ b/measure/src/test/resources/users_info_src.dat
@@ -0,0 +1,50 @@
+10001|Tom001|Jerrya|201 DisneyCity|tomajerrya001@dc.org|10000001|94022
+10002|Tom002|Jerrya|202 DisneyCity|tomajerrya002@dc.org|10000002|94022
+10003|Tom003|Jerrya|203 DisneyCity|tomajerrya003@dc.org|10000003|94022
+10004|Tom004|Jerrya|204 DisneyCity|tomajerrya004@dc.org|10000004|94022
+10005|Tom005|Jerrya|205 DisneyCity|tomajerrya005@dc.org|10000005|94022
+10006|Tom006|Jerrya|206 DisneyCity|tomajerrya006@dc.org|10000006|94022
+10007|Tom007|Jerrya|207 DisneyCity|tomajerrya007@dc.org|10000007|94022
+10008|Tom008|Jerrya|208 DisneyCity|tomajerrya008@dc.org|10000008|94022
+10009|Tom009|Jerrya|209 DisneyCity|tomajerrya009@dc.org|10000009|94022
+10010|Tom010|Jerrya|210 DisneyCity|tomajerrya010@dc.org|10000010|94022
+10011|Tom011|Jerrya|211 DisneyCity|tomajerrya011@dc.org|10000011|94022
+10012|Tom012|Jerrya|212 DisneyCity|tomajerrya012@dc.org|10000012|94022
+10013|Tom013|Jerrya|213 DisneyCity|tomajerrya013@dc.org|10000013|94022
+10014|Tom014|Jerrya|214 DisneyCity|tomajerrya014@dc.org|10000014|94022
+10015|Tom015|Jerrya|215 DisneyCity|tomajerrya015@dc.org|10000015|94022
+10016|Tom016|Jerrya|216 DisneyCity|tomajerrya016@dc.org|10000016|94022
+10017|Tom017|Jerrya|217 DisneyCity|tomajerrya017@dc.org|10000017|94022
+10018|Tom018|Jerrya|218 DisneyCity|tomajerrya018@dc.org|10000018|94022
+10019|Tom019|Jerrya|219 DisneyCity|tomajerrya019@dc.org|10000019|94022
+10020|Tom020|Jerrya|220 DisneyCity|tomajerrya020@dc.org|10000020|94022
+10021|Tom021|Jerrya|221 DisneyCity|tomajerrya021@dc.org|10000021|94022
+10022|Tom022|Jerrya|222 DisneyCity|tomajerrya022@dc.org|10000022|94022
+10023|Tom023|Jerrya|223 DisneyCity|tomajerrya023@dc.org|10000023|94022
+10024|Tom024|Jerrya|224 DisneyCity|tomajerrya024@dc.org|10000024|94022
+10025|Tom025|Jerrya|225 DisneyCity|tomajerrya025@dc.org|10000025|94022
+10026|Tom026|Jerrya|226 DisneyCity|tomajerrya026@dc.org|10000026|94022
+10027|Tom027|Jerrya|227 DisneyCity|tomajerrya027@dc.org|10000027|94022
+10028|Tom028|Jerrya|228 DisneyCity|tomajerrya028@dc.org|10000028|94022
+10029|Tom029|Jerrya|229 DisneyCity|tomajerrya029@dc.org|10000029|94022
+10030|Tom030|Jerrya|230 DisneyCity|tomajerrya030@dc.org|10000030|94022
+10031|Tom031|Jerrya|231 DisneyCity|tomajerrya031@dc.org|10000031|94022
+10032|Tom032|Jerrya|232 DisneyCity|tomajerrya032@dc.org|10000032|94022
+10033|Tom033|Jerrya|233 DisneyCity|tomajerrya033@dc.org|10000033|94022
+10034|Tom034|Jerrya|234 DisneyCity|tomajerrya034@dc.org|10000034|94022
+10035|Tom035|Jerrya|235 DisneyCity|tomajerrya035@dc.org|10000035|94022
+10036|Tom036|Jerrya|236 DisneyCity|tomajerrya036@dc.org|10000036|94022
+10037|Tom037|Jerrya|237 DisneyCity|tomajerrya037@dc.org|10000037|94022
+10038|Tom038|Jerrya|238 DisneyCity|tomajerrya038@dc.org|10000038|94022
+10039|Tom039|Jerrya|239 DisneyCity|tomajerrya039@dc.org|10000039|94022
+10040|Tom040|Jerrya|240 DisneyCity|tomajerrya040@dc.org|10000040|94022
+10041|Tom041|Jerrya|241 DisneyCity|tomajerrya041@dc.org|10000041|94022
+10042|Tom042|Jerrya|242 DisneyCity|tomajerrya042@dc.org|10000042|94022
+10043|Tom043|Jerrya|243 DisneyCity|tomajerrya043@dc.org|10000043|94022
+10044|Tom044|Jerrya|244 DisneyCity|tomajerrya044@dc.org|10000044|94022
+10045|Tom045|Jerrya|245 DisneyCity|tomajerrya045@dc.org|10000045|94022
+10046|Tom046|Jerrya|246 DisneyCity|tomajerrya046@dc.org|10000046|94022
+10047|Tom047|Jerrya|247 DisneyCity|tomajerrya047@dc.org|10000047|94022
+10048|Tom048|Jerrya|248 DisneyCity|tomajerrya048@dc.org|10000048|94022
+10049|Tom049|Jerrya|249 DisneyCity|tomajerrya049@dc.org|10000049|94022
+10050|Tom050|Jerrya|250 DisneyCity|tomajerrya050@dc.org|10000050|94022
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/resources/users_info_target.avro
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/users_info_target.avro b/measure/src/test/resources/users_info_target.avro
new file mode 100644
index 0000000..104dd6c
Binary files /dev/null and b/measure/src/test/resources/users_info_target.avro differ

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/resources/users_info_target.dat
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/users_info_target.dat b/measure/src/test/resources/users_info_target.dat
new file mode 100644
index 0000000..07a6b40
--- /dev/null
+++ b/measure/src/test/resources/users_info_target.dat
@@ -0,0 +1,50 @@
+10001|Tom001|Jerrya|201 DisneyCity|tomajerrya001@dc.org|10000101|94022
+10002|Tom002|Jerrya|202 DisneyCity|tomajerrya002@dc.org|10000102|94022
+10003|Tom003|Jerrya|203 DisneyCity|tomajerrya003@dc.org|10000003|94022
+10004|Tom004|Jerrya|204 DisneyCity|tomajerrya004@dc.org|10000004|94022
+10005|Tom005|Jerrya|205 DisneyCity|tomajerrya005@dc.org|10000005|94022
+10006|Tom006|Jerrya|206 DisneyCity|tomajerrya006@dc.org|10000006|94022
+10007|Tom007|Jerrya|207 DisneyCity|tomajerrya007@dc.org|10000007|94022
+10008|Tom008|Jerrya|208 DisneyCity|tomajerrya008@dc.org|10000008|94022
+10009|Tom009|Jerrya|209 DisneyCity|tomajerrya009@dc.org|10000009|94022
+10010|Tom010|Jerrya|210 DisneyCity|tomajerrya010@dc.org|10000010|94022
+10011|Tom011|Jerrya|211 DisneyCity|tomajerrya011@dc.org|10000011|94022
+10012|Tom012|Jerrya|212 DisneyCity|tomajerrya012@dc.org|10000012|94022
+10013|Tom013|Jerrya|213 DisneyCity|tomajerrya013@dc.org|10000013|94022
+10014|Tom014|Jerrya|214 DisneyCity|tomajerrya014@dc.org|10000014|94022
+10015|Tom015|Jerrya|215 DisneyCity|tomajerrya015@dc.org|10000015|94022
+10016|Tom016|Jerrya|216 DisneyCity|tomajerrya016@dc.org|10000016|94022
+10017|Tom017|Jerrya|217 DisneyCity|tomajerrya017@dc.org|10000017|94022
+10018|Tom018|Jerrya|218 DisneyCity|tomajerrya018@dc.org|10000018|94022
+10019|Tom019|Jerrya|219 DisneyCity|tomajerrya019@dc.org|10000019|94022
+10020|Tom020|Jerrya|220 DisneyCity|tomajerrya020@dc.org|10000020|94022
+10021|Tom021|Jerrya|221 DisneyCity|tomajerrya021@dc.org|10000021|94022
+10022|Tom022|Jerrya|222 DisneyCity|tomajerrya022@dc.org|10000022|94022
+10023|Tom023|Jerrya|223 DisneyCity|tomajerrya023@dc.org|10000023|94022
+10024|Tom024|Jerrya|224 DisneyCity|tomajerrya024@dc.org|10000024|94022
+10025|Tom025|Jerrya|225 DisneyCity|tomajerrya025@dc.org|10000025|94022
+10026|Tom026|Jerrya|226 DisneyCity|tomajerrya026@dc.org|10000026|94022
+10027|Tom027|Jerrya|227 DisneyCity|tomajerrya027@dc.org|10000027|94022
+10028|Tom028|Jerrya|228 DisneyCity|tomajerrya028@dc.org|10000028|94022
+10029|Tom029|Jerrya|229 DisneyCity|tomajerrya029@dc.org|10000029|94022
+10030|Tom030|Jerrya|230 DisneyCity|tomajerrya030@dc.org|10000030|94022
+10031|Tom031|Jerrya|231 DisneyCity|tomajerrya031@dc.org|10000031|94022
+10032|Tom032|Jerrya|232 DisneyCity|tomajerrya032@dc.org|10000032|94022
+10033|Tom033|Jerrya|233 DisneyCity|tomajerrya033@dc.org|10000033|94022
+10034|Tom034|Jerrya|234 DisneyCity|tomajerrya034@dc.org|10000034|94022
+10035|Tom035|Jerrya|235 DisneyCity|tomajerrya035@dc.org|10000035|94022
+10036|Tom036|Jerrya|236 DisneyCity|tomajerrya036@dc.org|10000036|94022
+10037|Tom037|Jerrya|237 DisneyCity|tomajerrya037@dc.org|10000037|94022
+10038|Tom038|Jerrya|238 DisneyCity|tomajerrya038@dc.org|10000038|94022
+10039|Tom039|Jerrya|239 DisneyCity|tomajerrya039@dc.org|10000039|94022
+10040|Tom040|Jerrya|240 DisneyCity|tomajerrya040@dc.org|10000040|94022
+10041|Tom041|Jerrya|241 DisneyCity|tomajerrya041@dc.org|10000041|94022
+10042|Tom042|Jerrya|242 DisneyCity|tomajerrya042@dc.org|10000042|94022
+10043|Tom043|Jerrya|243 DisneyCity|tomajerrya043@dc.org|10000043|94022
+10044|Tom044|Jerrya|244 DisneyCity|tomajerrya044@dc.org|10000044|94022
+10045|Tom045|Jerrya|245 DisneyCity|tomajerrya045@dc.org|10000045|94022
+10046|Tom046|Jerrya|246 DisneyCity|tomajerrya046@dc.org|10000046|94022
+10047|Tom047|Jerrya|247 DisneyCity|tomajerrya047@dc.org|10000047|94022
+10048|Tom048|Jerrya|248 DisneyCity|tomajerrya048@dc.org|10000048|94022
+10049|Tom049|Jerrya|249 DisneyCity|tomajerrya049@dc.org|10000049|94022
+10050|Tom050|Jerrya|250 DisneyCity|tomajerrya050@dc.org|10000050|94022
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
new file mode 100644
index 0000000..b611f3c
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
@@ -0,0 +1,188 @@
+///*-
+// * Licensed under the Apache License, Version 2.0 (the "License");
+//you may not use this file except in compliance with the License.
+//You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing, software
+//distributed under the License is distributed on an "AS IS" BASIS,
+//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//See the License for the specific language governing permissions and
+//limitations under the License.
+//
+// */
+// package org.apache.griffin.measure.batch.algo
+//
+//import java.util.Date
+//
+//import org.apache.griffin.measure.batch.config.params._
+//import org.apache.griffin.measure.batch.config.params.env._
+//import org.apache.griffin.measure.batch.config.params.user._
+//import org.apache.griffin.measure.batch.config.reader._
+//import org.apache.griffin.measure.batch.config.validator._
+//import org.apache.griffin.measure.batch.connector.{DataConnector, DataConnectorFactory}
+//import org.apache.griffin.measure.batch.log.Loggable
+//import org.apache.griffin.measure.batch.rule.expr._
+//import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
+//import org.apache.spark.rdd.RDD
+//import org.apache.spark.sql.SQLContext
+//import org.apache.spark.{SparkConf, SparkContext}
+//import org.junit.runner.RunWith
+//import org.scalatest.junit.JUnitRunner
+//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+//
+//import scala.util.{Failure, Success, Try}
+//
+//
+//@RunWith(classOf[JUnitRunner])
+//class BatchAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+//
+//  val envFile = "src/test/resources/env.json"
+////  val confFile = "src/test/resources/config.json"
+//  val confFile = "{\"name\":\"accu1\",\"type\":\"accuracy\",\"source\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_src.avro\"}},\"target\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_target.avro\"}},\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code AND (15 OR true) WHEN true AND $source.user_id > 10020\"}}"
+//  val envFsType = "local"
+//  val userFsType = "raw"
+//
+//  val args = Array(envFile, confFile)
+//
+//  var sc: SparkContext = _
+//  var sqlContext: SQLContext = _
+//
+//  var allParam: AllParam = _
+//
+//  before {
+//    // read param files
+//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    allParam = AllParam(envParam, userParam)
+//
+//    // validate param files
+//    validateParams(allParam) match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-3)
+//      }
+//      case _ => {
+//        info("params validation pass")
+//      }
+//    }
+//
+//    val metricName = userParam.name
+//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
+//    sc = new SparkContext(conf)
+//    sqlContext = new SQLContext(sc)
+//  }
+//
+//  test("algorithm") {
+//    Try {
+//      val envParam = allParam.envParam
+//      val userParam = allParam.userParam
+//
+//      // start time
+//      val startTime = new Date().getTime()
+//
+//      // get spark application id
+//      val applicationId = sc.applicationId
+//
+//      // rules
+//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+//      val rule: StatementExpr = ruleFactory.generateRule()
+//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+//
+//      ruleAnalyzer.constCacheExprs.foreach(println)
+//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
+//
+//      // global cache data
+//      val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
+//      val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
+//
+//      // data connector
+//      val sourceDataConnector: DataConnector =
+//        DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
+//          ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
+//        ) match {
+//          case Success(cntr) => {
+//            if (cntr.available) cntr
+//            else throw new Exception("source data not available!")
+//          }
+//          case Failure(ex) => throw ex
+//        }
+//      val targetDataConnector: DataConnector =
+//        DataConnectorFactory.getDataConnector(sqlContext, userParam.targetParam,
+//          ruleAnalyzer.targetRuleExprs, finalConstExprValueMap
+//        ) match {
+//          case Success(cntr) => {
+//            if (cntr.available) cntr
+//            else throw new Exception("target data not available!")
+//          }
+//          case Failure(ex) => throw ex
+//        }
+//
+//      // get metadata
+////      val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
+////        case Success(md) => md
+////        case Failure(ex) => throw ex
+////      }
+////      val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match {
+////        case Success(md) => md
+////        case Failure(ex) => throw ex
+////      }
+//
+//      // get data
+//      val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
+//        case Success(dt) => dt
+//        case Failure(ex) => throw ex
+//      }
+//      val targetData: RDD[(Product, Map[String, Any])] = targetDataConnector.data() match {
+//        case Success(dt) => dt
+//        case Failure(ex) => throw ex
+//      }
+//
+//      // my algo
+//      val algo = BatchAccuracyAlgo(allParam)
+//
+//      // accuracy algorithm
+//      val (accuResult, missingRdd, matchedRdd) = algo.accuracy(sourceData, targetData, ruleAnalyzer)
+//
+//      println(s"match percentage: ${accuResult.matchPercentage}, total count: ${accuResult.total}")
+//
+//      missingRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs)).foreach(println)
+//
+//      // end time
+//      val endTime = new Date().getTime
+//      println(s"using time: ${endTime - startTime} ms")
+//    } match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-4)
+//      }
+//      case _ => {
+//        info("calculation finished")
+//      }
+//    }
+//  }
+//
+//  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
+//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+//    paramReader.readConfig[T]
+//  }
+//
+//  private def validateParams(allParam: AllParam): Try[Boolean] = {
+//    val allParamValidator = AllParamValidator()
+//    allParamValidator.validate(allParam)
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
new file mode 100644
index 0000000..ce59af0
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
@@ -0,0 +1,163 @@
+///*-
+// * Licensed under the Apache License, Version 2.0 (the "License");
+//you may not use this file except in compliance with the License.
+//You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing, software
+//distributed under the License is distributed on an "AS IS" BASIS,
+//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//See the License for the specific language governing permissions and
+//limitations under the License.
+//
+// */
+//package org.apache.griffin.measure.batch.algo
+//
+//import java.util.Date
+//
+//import org.apache.griffin.measure.batch.config.params._
+//import org.apache.griffin.measure.batch.config.params.env._
+//import org.apache.griffin.measure.batch.config.params.user._
+//import org.apache.griffin.measure.batch.config.reader._
+//import org.apache.griffin.measure.batch.config.validator._
+//import org.apache.griffin.measure.batch.connector.{DataConnector, DataConnectorFactory}
+//import org.apache.griffin.measure.batch.log.Loggable
+//import org.apache.griffin.measure.batch.rule.expr._
+//import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
+//import org.apache.spark.rdd.RDD
+//import org.apache.spark.sql.SQLContext
+//import org.apache.spark.{SparkConf, SparkContext}
+//import org.junit.runner.RunWith
+//import org.scalatest.junit.JUnitRunner
+//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+//
+//import scala.util.{Failure, Success, Try}
+//
+//
+//@RunWith(classOf[JUnitRunner])
+//class BatchProfileAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+//
+//  val envFile = "src/test/resources/env.json"
+//  val confFile = "src/test/resources/config-profile.json"
+//  val envFsType = "local"
+//  val userFsType = "local"
+//
+//  val args = Array(envFile, confFile)
+//
+//  var sc: SparkContext = _
+//  var sqlContext: SQLContext = _
+//
+//  var allParam: AllParam = _
+//
+//  before {
+//    // read param files
+//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    allParam = AllParam(envParam, userParam)
+//
+//    // validate param files
+//    validateParams(allParam) match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-3)
+//      }
+//      case _ => {
+//        info("params validation pass")
+//      }
+//    }
+//
+//    val metricName = userParam.name
+//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
+//    sc = new SparkContext(conf)
+//    sqlContext = new SQLContext(sc)
+//  }
+//
+//  test("algorithm") {
+//    Try {
+//      val envParam = allParam.envParam
+//      val userParam = allParam.userParam
+//
+//      // start time
+//      val startTime = new Date().getTime()
+//
+//      // get spark application id
+//      val applicationId = sc.applicationId
+//
+//      // rules
+//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+//      val rule: StatementExpr = ruleFactory.generateRule()
+//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+//
+//      ruleAnalyzer.constCacheExprs.foreach(println)
+//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
+//
+//      // global cache data
+//      val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
+//      val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
+//
+//      // data connector
+//      val sourceDataConnector: DataConnector =
+//        DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
+//          ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
+//        ) match {
+//          case Success(cntr) => {
+//            if (cntr.available) cntr
+//            else throw new Exception("source data not available!")
+//          }
+//          case Failure(ex) => throw ex
+//        }
+//
+//      // get data
+//      val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
+//        case Success(dt) => dt
+//        case Failure(ex) => throw ex
+//      }
+//
+//      // my algo
+//      val algo = BatchProfileAlgo(allParam)
+//
+//      // profile algorithm
+//      val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, ruleAnalyzer)
+//
+//      println(s"match percentage: ${profileResult.matchPercentage}, match count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}")
+//
+//      matchedRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println)
+//
+//      // end time
+//      val endTime = new Date().getTime
+//      println(s"using time: ${endTime - startTime} ms")
+//    } match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-4)
+//      }
+//      case _ => {
+//        info("calculation finished")
+//      }
+//    }
+//  }
+//
+//  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
+//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+//    paramReader.readConfig[T]
+//  }
+//
+//  private def validateParams(allParam: AllParam): Try[Boolean] = {
+//    val allParamValidator = AllParamValidator()
+//    allParamValidator.validate(allParam)
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCoreTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCoreTest.scala b/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCoreTest.scala
new file mode 100644
index 0000000..ad109d2
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCoreTest.scala
@@ -0,0 +1,85 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.algo.core
+
+import org.apache.griffin.measure.batch.config.params.user.EvaluateRuleParam
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.griffin.measure.batch.rule.{RuleAnalyzer, RuleFactory}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.PrivateMethodTester
+
+@RunWith(classOf[JUnitRunner])
+class AccuracyCoreTest extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester {
+
+  def findExprId(exprs: Iterable[Expr], desc: String): String = {
+    exprs.find(_.desc == desc) match {
+      case Some(expr) => expr._id
+      case _ => ""
+    }
+  }
+
+  test ("match data success") {
+    val rule = "$source.name = $target.name AND $source.age < $target.age"
+    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
+    val ruleFactory = RuleFactory(evaluateRuleParam)
+    val statement = ruleFactory.generateRule
+    val ruleAnalyzer = RuleAnalyzer(statement)
+
+    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
+    val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs
+
+    val source = (Map[String, Any](
+      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
+      (findExprId(sourcePersistExprs, "$source['age']") -> 26)
+    ), Map[String, Any]())
+    val target = (Map[String, Any](
+      (findExprId(targetPersistExprs, "$target['name']") -> "jack"),
+      (findExprId(targetPersistExprs, "$target['age']") -> 27)
+    ), Map[String, Any]())
+
+    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
+    val result = AccuracyCore invokePrivate matchData(source, target, ruleAnalyzer)
+    result._1 should be (true)
+    result._2.size should be (0)
+  }
+
+  test ("match data fail") {
+    val rule = "$source.name = $target.name AND $source.age = $target.age"
+    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
+    val ruleFactory = RuleFactory(evaluateRuleParam)
+    val statement = ruleFactory.generateRule
+    val ruleAnalyzer = RuleAnalyzer(statement)
+
+    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
+    val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs
+
+    val source = (Map[String, Any](
+      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
+      (findExprId(sourcePersistExprs, "$source['age']") -> 26)
+    ), Map[String, Any]())
+    val target = (Map[String, Any](
+      (findExprId(targetPersistExprs, "$target['name']") -> "jack"),
+      (findExprId(targetPersistExprs, "$target['age']") -> 27)
+    ), Map[String, Any]())
+
+    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
+    val result = AccuracyCore invokePrivate matchData(source, target, ruleAnalyzer)
+    result._1 should be (false)
+    result._2.size shouldNot be (0)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/ProfileCoreTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/ProfileCoreTest.scala b/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/ProfileCoreTest.scala
new file mode 100644
index 0000000..e1eeb11
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/ProfileCoreTest.scala
@@ -0,0 +1,75 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.algo.core
+
+import org.apache.griffin.measure.batch.config.params.user.EvaluateRuleParam
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.griffin.measure.batch.rule.{RuleAnalyzer, RuleFactory}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.PrivateMethodTester
+
+@RunWith(classOf[JUnitRunner])
+class ProfileCoreTest extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester {
+
+  def findExprId(exprs: Iterable[Expr], desc: String): String = {
+    exprs.find(_.desc == desc) match {
+      case Some(expr) => expr._id
+      case _ => ""
+    }
+  }
+
+  test ("match data success") {
+    val rule = "$source.name = 'jack' AND $source.age = null"
+    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
+    val ruleFactory = RuleFactory(evaluateRuleParam)
+    val statement = ruleFactory.generateRule
+    val ruleAnalyzer = RuleAnalyzer(statement)
+
+    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
+
+    val source = (Map[String, Any](
+      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
+      (findExprId(sourcePersistExprs, "$source['age']") -> null)
+    ), Map[String, Any]())
+
+    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
+    val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer)
+    result._1 should be (true)
+    result._2.size should be (0)
+  }
+
+  test ("match data fail") {
+    val rule = "$source.name = 'jack' AND $source.age != null"
+    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
+    val ruleFactory = RuleFactory(evaluateRuleParam)
+    val statement = ruleFactory.generateRule
+    val ruleAnalyzer = RuleAnalyzer(statement)
+
+    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
+
+    val source = (Map[String, Any](
+      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
+      (findExprId(sourcePersistExprs, "$source['age']") -> null)
+    ), Map[String, Any]())
+
+    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
+    val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer)
+    result._1 should be (false)
+    result._2.size shouldNot be (0)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReaderTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReaderTest.scala b/measure/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReaderTest.scala
new file mode 100644
index 0000000..2b46c72
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReaderTest.scala
@@ -0,0 +1,34 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.griffin.measure.batch.config.params.env._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class ParamRawStringReaderTest extends FunSuite with Matchers with BeforeAndAfter {
+
+  test("read config") {
+    val rawString = """{"type": "hdfs", "config": {"path": "/path/to", "time": 1234567}}"""
+
+    val reader = ParamRawStringReader(rawString)
+    val paramTry = reader.readConfig[PersistParam]
+    paramTry.isSuccess should be (true)
+    paramTry.get should be (PersistParam("hdfs", Map[String, Any](("path" -> "/path/to"), ("time" -> 1234567))))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidatorTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidatorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidatorTest.scala
new file mode 100644
index 0000000..923746d
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidatorTest.scala
@@ -0,0 +1,36 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.validator
+
+import org.apache.griffin.measure.batch.config.params._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
+import org.scalamock.scalatest.MockFactory
+
+@RunWith(classOf[JUnitRunner])
+class AllParamValidatorTest extends FlatSpec with Matchers with BeforeAndAfter with MockFactory {
+
+  "validate" should "pass" in {
+    val validator = AllParamValidator()
+    val paramMock = mock[Param]
+    paramMock.validate _ expects () returning (false)
+
+    val validateTry = validator.validate(paramMock)
+    validateTry.isSuccess should be (true)
+    validateTry.get should be (false)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HdfsPersistTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HdfsPersistTest.scala b/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HdfsPersistTest.scala
new file mode 100644
index 0000000..327632a
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HdfsPersistTest.scala
@@ -0,0 +1,44 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.persist
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.util.{Try, Failure}
+
+@RunWith(classOf[JUnitRunner])
+class HdfsPersistTest extends FunSuite with Matchers with BeforeAndAfter {
+
+  val config: Map[String, Any] = Map[String, Any](
+    ("path" -> "/path/to"), ("max.persist.lines" -> 100), ("max.lines.per.file" -> 1000))
+  val metricName: String = "metric"
+  val timeStamp: Long = 123456789L
+
+  val hdfsPersist = HdfsPersist(config, metricName, timeStamp)
+
+  test ("constructor") {
+    hdfsPersist.path should be ("/path/to")
+    hdfsPersist.maxPersistLines should be (100)
+    hdfsPersist.maxLinesPerFile should be (1000)
+
+    hdfsPersist.StartFile should be (s"/path/to/${metricName}/${timeStamp}/_START")
+  }
+
+  test ("avaiable") {
+    hdfsPersist.available should be (true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HttpPersistTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HttpPersistTest.scala b/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HttpPersistTest.scala
new file mode 100644
index 0000000..7f2b4b5
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HttpPersistTest.scala
@@ -0,0 +1,38 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.persist
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class HttpPersistTest extends FunSuite with Matchers with BeforeAndAfter {
+
+  val config: Map[String, Any] = Map[String, Any](("api" -> "url/api"), ("method" -> "post"))
+  val metricName: String = "metric"
+  val timeStamp: Long = 123456789L
+
+  val httpPersist = HttpPersist(config, metricName, timeStamp)
+
+  test ("constructor") {
+    httpPersist.api should be ("url/api")
+    httpPersist.method should be ("post")
+  }
+
+  test("available") {
+    httpPersist.available should be (true)
+  }
+}