You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/09/30 08:35:20 UTC

[05/11] incubator-griffin git commit: Dsl modify

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala
deleted file mode 100644
index 661e8f4..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.expr
-
-import org.apache.griffin.measure.rule.CalculationUtil._
-import org.apache.griffin.measure.rule.DataTypeCalculationUtil._
-import org.apache.spark.sql.types.DataType
-
-trait MathExpr extends Expr {
-
-}
-
-case class MathFactorExpr(self: Expr) extends MathExpr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = self.calculate(values)
-  val desc: String = self.desc
-  val dataSources: Set[String] = self.dataSources
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    self.getCacheExprs(ds)
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    self.getFinalCacheExprs(ds)
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    self.getPersistExprs(ds)
-  }
-}
-
-case class UnaryMathExpr(oprList: Iterable[String], factor: Expr) extends MathExpr {
-  private val (posOpr, negOpr) = ("+", "-")
-  def calculateOnly(values: Map[String, Any]): Option[Any] = {
-    val fv = factor.calculate(values)
-    oprList.foldRight(fv) { (opr, v) =>
-      opr match {
-        case this.posOpr => v
-        case this.negOpr => -v
-        case _ => None
-      }
-    }
-  }
-  val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev}${ex}" }
-  val dataSources: Set[String] = factor.dataSources
-  override def cacheUnit: Boolean = true
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    factor.getCacheExprs(ds)
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    factor.getFinalCacheExprs(ds)
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    factor.getPersistExprs(ds)
-  }
-}
-
-case class BinaryMathExpr(first: MathExpr, others: Iterable[(String, MathExpr)]) extends MathExpr {
-  private val (addOpr, subOpr, mulOpr, divOpr, modOpr) = ("+", "-", "*", "/", "%")
-  def calculateOnly(values: Map[String, Any]): Option[Any] = {
-    val fv = first.calculate(values)
-    others.foldLeft(fv) { (v, pair) =>
-      val (opr, next) = pair
-      val nv = next.calculate(values)
-      opr match {
-        case this.addOpr => v + nv
-        case this.subOpr => v - nv
-        case this.mulOpr => v * nv
-        case this.divOpr => v / nv
-        case this.modOpr => v % nv
-        case _ => None
-      }
-    }
-  }
-  val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" }
-  val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet
-  override def cacheUnit: Boolean = true
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds))
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds))
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala
deleted file mode 100644
index 5b7f1b0..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.expr
-
-import org.apache.spark.sql.types.DataType
-import org.apache.griffin.measure.rule.CalculationUtil._
-
-trait SelectExpr extends Expr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = None
-}
-
-case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends SelectExpr {
-  val desc: String = s"[${fields.map(_.desc).mkString(", ")}]"
-  val dataSources: Set[String] = Set.empty[String]
-}
-
-case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) extends SelectExpr {
-  val desc: String = s".${func}(${args.map(_.desc).mkString(", ")})"
-  val dataSources: Set[String] = args.flatMap(_.dataSources).toSet
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getCacheExprs(ds))
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getFinalCacheExprs(ds))
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = args.flatMap(_.getPersistExprs(ds))
-}
-
-case class FilterSelectExpr(field: FieldDesc, compare: String, value: MathExpr) extends SelectExpr {
-  val desc: String = s"[${field.desc} ${compare} ${value.desc}]"
-  val dataSources: Set[String] = value.dataSources
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = value.getCacheExprs(ds)
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = value.getFinalCacheExprs(ds)
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = value.getPersistExprs(ds)
-  private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, """!==?""".r, ">", ">=", "<", "<=")
-  override def calculateOnly(values: Map[String, Any]): Option[Any] = {
-    val (lv, rv) = (values.get(fieldKey), value.calculate(values))
-    compare match {
-      case this.eqOpr() => lv === rv
-      case this.neqOpr() => lv =!= rv
-      case this.btOpr => lv > rv
-      case this.bteOpr => lv >= rv
-      case this.ltOpr => lv < rv
-      case this.lteOpr => lv <= rv
-      case _ => None
-    }
-  }
-  def fieldKey: String = s"__${field.field}"
-}
-
-// -- selection --
-case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) extends Expr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id)
-
-  val desc: String = {
-    val argsString = selectors.map(_.desc).mkString("")
-    s"${head.desc}${argsString}"
-  }
-  val dataSources: Set[String] = {
-    val selectorDataSources = selectors.flatMap(_.dataSources).toSet
-    selectorDataSources + head.head
-  }
-
-  override def cacheUnit: Boolean = true
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    selectors.flatMap(_.getCacheExprs(ds))
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    selectors.flatMap(_.getFinalCacheExprs(ds))
-  }
-
-  override def persistUnit: Boolean = true
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    selectors.flatMap(_.getPersistExprs(ds))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala
deleted file mode 100644
index 15161c3..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.func
-
-import org.apache.griffin.measure.utils.JsonUtil
-
-class DefaultFunctionDefine extends FunctionDefine {
-
-  def json(strOpt: Option[_]): Map[String, Any] = {
-    try {
-      strOpt match {
-        case Some(str: String) => JsonUtil.toAnyMap(str)
-        case _ => throw new Exception("json function param should be string")
-      }
-    } catch {
-      case e: Throwable => throw e
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala
deleted file mode 100644
index d23fc7a..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.func
-
-trait FunctionDefine extends Serializable {
-
-}
-
-class UnKnown {}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala
deleted file mode 100644
index 57e934d..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.func
-
-import java.lang.reflect.Method
-
-import org.apache.griffin.measure.log.Loggable
-
-import scala.collection.mutable.{Map => MutableMap}
-
-object FunctionUtil extends Loggable {
-
-  val functionDefines: MutableMap[String, FunctionDefine] = MutableMap[String, FunctionDefine]()
-
-  registerFunctionDefine(Array(classOf[DefaultFunctionDefine].getCanonicalName))
-
-  def registerFunctionDefine(classes: Iterable[String]): Unit = {
-    for (cls <- classes) {
-      try {
-        val clz: Class[_] = Class.forName(cls)
-        if (classOf[FunctionDefine].isAssignableFrom(clz)) {
-          functionDefines += (cls -> clz.newInstance.asInstanceOf[FunctionDefine])
-        } else {
-          warn(s"${cls} register fails: ${cls} is not sub class of ${classOf[FunctionDefine].getCanonicalName}")
-        }
-      } catch {
-        case e: Throwable => warn(s"${cls} register fails: ${e.getMessage}")
-      }
-    }
-  }
-
-  def invoke(methodName: String, params: Array[Option[Any]]): Seq[Option[Any]] = {
-//    val paramTypes = params.map { param =>
-//      try {
-//        param match {
-//          case Some(v) => v.getClass
-//          case _ => classOf[UnKnown]
-//        }
-//      } catch {
-//        case e: Throwable => classOf[UnKnown]
-//      }
-//    }
-    val paramTypes = params.map(a => classOf[Option[_]])
-
-    functionDefines.values.foldLeft(Nil: Seq[Option[Any]]) { (res, funcDef) =>
-      if (res.isEmpty) {
-        val clz = funcDef.getClass
-        try {
-          val method = clz.getMethod(methodName, paramTypes: _*)
-          Seq(Some(method.invoke(funcDef, params: _*)))
-        } catch {
-          case e: Throwable => res
-        }
-      } else res
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala
new file mode 100644
index 0000000..22d64d8
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala
@@ -0,0 +1,72 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.preproc
+
+object PreProcRuleGenerator {
+
+  val _name = "name"
+
+  def genPreProcRules(rules: Seq[Map[String, Any]], suffix: String): Seq[Map[String, Any]] = {
+    if (rules == null) Nil else {
+      rules.map { rule =>
+        genPreProcRule(rule, suffix)
+      }
+    }
+  }
+
+  def getRuleNames(rules: Seq[Map[String, Any]]): Seq[String] = {
+    if (rules == null) Nil else {
+      rules.flatMap { rule =>
+        rule.get(_name) match {
+          case Some(s: String) => Some(s)
+          case _ => None
+        }
+      }
+    }
+  }
+
+  private def genPreProcRule(param: Map[String, Any], suffix: String
+                            ): Map[String, Any] = {
+    val keys = param.keys
+    keys.foldLeft(param) { (map, key) =>
+      map.get(key) match {
+        case Some(s: String) => map + (key -> genNewString(s, suffix))
+        case Some(subMap: Map[String, Any]) => map + (key -> genPreProcRule(subMap, suffix))
+        case Some(arr: Seq[_]) => map + (key -> genPreProcRule(arr, suffix))
+        case _ => map
+      }
+    }
+  }
+
+  private def genPreProcRule(paramArr: Seq[Any], suffix: String): Seq[Any] = {
+    paramArr.foldLeft(Nil: Seq[Any]) { (res, param) =>
+      param match {
+        case s: String => res :+ genNewString(s, suffix)
+        case map: Map[String, Any] => res :+ genPreProcRule(map, suffix)
+        case arr: Seq[_] => res :+ genPreProcRule(arr, suffix)
+        case _ => res :+ param
+      }
+    }
+  }
+
+  private def genNewString(str: String, suffix: String): String = {
+    str.replaceAll("""\$\{(.*)\}""", s"$$1_${suffix}")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala
new file mode 100644
index 0000000..4b3a4d4
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.step
+
+import org.apache.griffin.measure.rule.dsl._
+
+trait ConcreteRuleStep extends RuleStep {
+
+  val persistType: PersistType
+
+  val updateDataSource: Option[String]
+
+//  def isGroupMetric: Boolean = {
+//    val _GroupMetric = "group.metric"
+//    details.get(_GroupMetric) match {
+//      case Some(b: Boolean) => b
+//      case _ => false
+//    }
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala
new file mode 100644
index 0000000..86f0bf3
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala
@@ -0,0 +1,29 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.step
+
+import org.apache.griffin.measure.rule.dsl._
+
+case class DfOprStep(name: String, rule: String, details: Map[String, Any],
+                     persistType: PersistType, updateDataSource: Option[String]
+                    ) extends ConcreteRuleStep {
+
+  val dslType: DslType = DfOprType
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala
new file mode 100644
index 0000000..21db8cf
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala
@@ -0,0 +1,28 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.step
+
+import org.apache.griffin.measure.rule.dsl._
+
+case class GriffinDslStep(name: String, rule: String, dqType: DqType, details: Map[String, Any]
+                         ) extends RuleStep {
+
+  val dslType: DslType = GriffinDslType
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala
new file mode 100644
index 0000000..4675ffe
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.step
+
+import org.apache.griffin.measure.rule.dsl.{DslType, PersistType}
+
+trait RuleStep extends Serializable {
+
+  val dslType: DslType
+
+  val name: String
+  val rule: String
+  val details: Map[String, Any]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala
new file mode 100644
index 0000000..62c3c35
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.step
+
+import org.apache.griffin.measure.persist._
+import org.apache.griffin.measure.rule.dsl._
+
+case class SparkSqlStep(name: String, rule: String, details: Map[String, Any],
+                        persistType: PersistType, updateDataSource: Option[String]
+                       ) extends ConcreteRuleStep {
+
+  val dslType: DslType = SparkSqlType
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
new file mode 100644
index 0000000..11e8c8f
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.udf
+
+import org.apache.spark.sql.SQLContext
+
+object GriffinUdfs {
+
+  def register(sqlContext: SQLContext): Unit = {
+    sqlContext.udf.register("index_of", indexOf)
+  }
+
+  private val indexOf = (arr: Seq[String], v: String) => {
+    arr.indexOf(v)
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
index 8a608ff..416f567 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
@@ -68,7 +68,7 @@ object HdfsFileDumpUtil {
 
   def remove(path: String, filename: String, withSuffix: Boolean): Unit = {
     if (withSuffix) {
-      val files = HdfsUtil.listSubPaths(path, "file")
+      val files = HdfsUtil.listSubPathsByType(path, "file")
       val patternFiles = files.filter(samePattern(_, filename))
       patternFiles.foreach { f =>
         val rmPath = HdfsUtil.getHdfsFilePath(path, f)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
index 6dd54b7..9fa6bcf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
@@ -18,10 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.utils
 
+import org.apache.griffin.measure.log.Loggable
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
 
-object HdfsUtil {
+object HdfsUtil extends Loggable {
 
   private val seprator = "/"
 
@@ -32,8 +33,17 @@ object HdfsUtil {
   private val dfs = FileSystem.get(conf)
 
   def existPath(filePath: String): Boolean = {
-    val path = new Path(filePath)
-    dfs.exists(path)
+    try {
+      val path = new Path(filePath)
+      dfs.exists(path)
+    } catch {
+      case e: Throwable => false
+    }
+  }
+
+  def existFileInDir(dirPath: String, fileName: String): Boolean = {
+    val filePath = getHdfsFilePath(dirPath, fileName)
+    existPath(filePath)
   }
 
   def createFile(filePath: String): FSDataOutputStream = {
@@ -75,8 +85,12 @@ object HdfsUtil {
   }
 
   def deleteHdfsPath(dirPath: String): Unit = {
-    val path = new Path(dirPath)
-    if (dfs.exists(path)) dfs.delete(path, true)
+    try {
+      val path = new Path(dirPath)
+      if (dfs.exists(path)) dfs.delete(path, true)
+    } catch {
+      case e: Throwable => error(s"delete path [${dirPath}] error: ${e.getMessage}")
+    }
   }
 
 //  def listPathFiles(dirPath: String): Iterable[String] = {
@@ -96,25 +110,38 @@ object HdfsUtil {
 //    }
 //  }
 
-  def listSubPaths(dirPath: String, subType: String, fullPath: Boolean = false): Iterable[String] = {
-    val path = new Path(dirPath)
-    try {
-      val fileStatusArray = dfs.listStatus(path)
-      fileStatusArray.filter { fileStatus =>
-        subType match {
-          case "dir" => fileStatus.isDirectory
-          case "file" => fileStatus.isFile
-          case _ => true
+  def listSubPathsByType(dirPath: String, subType: String, fullPath: Boolean = false): Iterable[String] = {
+    if (existPath(dirPath)) {
+      try {
+        val path = new Path(dirPath)
+        val fileStatusArray = dfs.listStatus(path)
+        fileStatusArray.filter { fileStatus =>
+          subType match {
+            case "dir" => fileStatus.isDirectory
+            case "file" => fileStatus.isFile
+            case _ => true
+          }
+        }.map { fileStatus =>
+          val fname = fileStatus.getPath.getName
+          if (fullPath) getHdfsFilePath(dirPath, fname) else fname
+        }
+      } catch {
+        case e: Throwable => {
+          warn(s"list path [${dirPath}] warn: ${e.getMessage}")
+          Nil
         }
-      }.map { fileStatus =>
-        val fname = fileStatus.getPath.getName
-        if (fullPath) getHdfsFilePath(dirPath, fname) else fname
-      }
-    } catch {
-      case e: Throwable => {
-        println(s"list path files error: ${e.getMessage}")
-        Nil
       }
+    } else Nil
+  }
+
+  def listSubPathsByTypes(dirPath: String, subTypes: Iterable[String], fullPath: Boolean = false): Iterable[String] = {
+    subTypes.flatMap { subType =>
+      listSubPathsByType(dirPath, subType, fullPath)
     }
   }
+
+  def fileNameFromPath(filePath: String): String = {
+    val path = new Path(filePath)
+    path.getName
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
new file mode 100644
index 0000000..7954b6d
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
@@ -0,0 +1,164 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+object ParamUtil {
+
+  implicit class ParamMap(params: Map[String, Any]) {
+    def getAny(key: String, defValue: Any): Any = {
+      params.get(key) match {
+        case Some(v) => v
+        case _ => defValue
+      }
+    }
+
+    def getAnyRef[T](key: String, defValue: T)(implicit m: Manifest[T]): T = {
+      params.get(key) match {
+        case Some(v: T) => v
+        case _ => defValue
+      }
+    }
+
+    def getString(key: String, defValue: String): String = {
+      try {
+        params.get(key) match {
+          case Some(v: String) => v.toString
+          case Some(v) => v.toString
+          case _ => defValue
+        }
+      } catch {
+        case _: Throwable => defValue
+      }
+    }
+
+    def getByte(key: String, defValue: Byte): Byte = {
+      try {
+        params.get(key) match {
+          case Some(v: String) => v.toByte
+          case Some(v: Byte) => v.toByte
+          case Some(v: Short) => v.toByte
+          case Some(v: Int) => v.toByte
+          case Some(v: Long) => v.toByte
+          case Some(v: Float) => v.toByte
+          case Some(v: Double) => v.toByte
+          case _ => defValue
+        }
+      } catch {
+        case _: Throwable => defValue
+      }
+    }
+
+    def getShort(key: String, defValue: Short): Short = {
+      try {
+        params.get(key) match {
+          case Some(v: String) => v.toShort
+          case Some(v: Byte) => v.toShort
+          case Some(v: Short) => v.toShort
+          case Some(v: Int) => v.toShort
+          case Some(v: Long) => v.toShort
+          case Some(v: Float) => v.toShort
+          case Some(v: Double) => v.toShort
+          case _ => defValue
+        }
+      } catch {
+        case _: Throwable => defValue
+      }
+    }
+
+    def getInt(key: String, defValue: Int): Int = {
+      try {
+        params.get(key) match {
+          case Some(v: String) => v.toInt
+          case Some(v: Byte) => v.toInt
+          case Some(v: Short) => v.toInt
+          case Some(v: Int) => v.toInt
+          case Some(v: Long) => v.toInt
+          case Some(v: Float) => v.toInt
+          case Some(v: Double) => v.toInt
+          case _ => defValue
+        }
+      } catch {
+        case _: Throwable => defValue
+      }
+    }
+
+    def getLong(key: String, defValue: Long): Long = {
+      try {
+        params.get(key) match {
+          case Some(v: String) => v.toLong
+          case Some(v: Byte) => v.toLong
+          case Some(v: Short) => v.toLong
+          case Some(v: Int) => v.toLong
+          case Some(v: Long) => v.toLong
+          case Some(v: Float) => v.toLong
+          case Some(v: Double) => v.toLong
+          case _ => defValue
+        }
+      } catch {
+        case _: Throwable => defValue
+      }
+    }
+
+    def getFloat(key: String, defValue: Float): Float = {
+      try {
+        params.get(key) match {
+          case Some(v: String) => v.toFloat
+          case Some(v: Byte) => v.toFloat
+          case Some(v: Short) => v.toFloat
+          case Some(v: Int) => v.toFloat
+          case Some(v: Long) => v.toFloat
+          case Some(v: Float) => v.toFloat
+          case Some(v: Double) => v.toFloat
+          case _ => defValue
+        }
+      } catch {
+        case _: Throwable => defValue
+      }
+    }
+
+    def getDouble(key: String, defValue: Double): Double = {
+      try {
+        params.get(key) match {
+          case Some(v: String) => v.toDouble
+          case Some(v: Byte) => v.toDouble
+          case Some(v: Short) => v.toDouble
+          case Some(v: Int) => v.toDouble
+          case Some(v: Long) => v.toDouble
+          case Some(v: Float) => v.toDouble
+          case Some(v: Double) => v.toDouble
+          case _ => defValue
+        }
+      } catch {
+        case _: Throwable => defValue
+      }
+    }
+
+    def getBoolean(key: String, defValue: Boolean): Boolean = {
+      try {
+        params.get(key) match {
+          case Some(v: String) => v.toBoolean
+          case _ => defValue
+        }
+      } catch {
+        case _: Throwable => defValue
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
index 0079d10..fe721d2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
@@ -22,8 +22,8 @@ import scala.util.{Failure, Success, Try}
 
 object TimeUtil {
 
-  final val TimeRegex = """([+\-]?\d+)(d|h|m|s|ms)""".r
-  final val PureTimeRegex = """([+\-]?\d+)""".r
+  final val TimeRegex = """^([+\-]?\d+)(d|h|m|s|ms)$""".r
+  final val PureTimeRegex = """^([+\-]?\d+)$""".r
 
   def milliseconds(timeString: String): Option[Long] = {
     val value: Option[Long] = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-accuracy-streaming-multids.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test-accuracy-streaming-multids.json b/measure/src/test/resources/config-test-accuracy-streaming-multids.json
new file mode 100644
index 0000000..18532b0
--- /dev/null
+++ b/measure/src/test/resources/config-test-accuracy-streaming-multids.json
@@ -0,0 +1,144 @@
+{
+  "name": "accu_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "persist.type": "cache",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        },
+        {
+          "type": "text-dir",
+          "config": {
+            "dir.path": "hdfs://localhost/griffin/text",
+            "data.dir.depth": 0,
+            "success.file": "_SUCCESS",
+            "done.file": "_DONE"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "persist.type": "cache",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-2m", "0"]
+      }
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${t1}",
+              "rule": "from_json",
+              "persist.type": "cache",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${t1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/target",
+        "info.path": "target",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-2m", "0"]
+      }
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "rule": "source.name = target.name and source.age = target.age",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "miss.records": {
+            "name": "miss.records",
+            "persist.type": "record",
+            "update.data.source": "source"
+          },
+          "accuracy": {
+            "name": "accu",
+            "persist.type": "metric"
+          },
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-accuracy-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test-accuracy-streaming.json b/measure/src/test/resources/config-test-accuracy-streaming.json
new file mode 100644
index 0000000..276f8dd
--- /dev/null
+++ b/measure/src/test/resources/config-test-accuracy-streaming.json
@@ -0,0 +1,119 @@
+{
+  "name": "accu_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "persist.type": "cache",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-2m", "0"]
+      }
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${t1}",
+              "rule": "from_json",
+              "persist.type": "cache",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${t1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/target",
+        "info.path": "target",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-2m", "0"]
+      }
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "rule": "source.name = target.name and source.age = target.age",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "miss.records": {
+            "name": "miss.records",
+            "persist.type": "record",
+            "update.data.source": "source"
+          },
+          "accuracy": {
+            "name": "accu",
+            "persist.type": "metric"
+          },
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-accuracy.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test-accuracy.json b/measure/src/test/resources/config-test-accuracy.json
new file mode 100644
index 0000000..ecbdaaa
--- /dev/null
+++ b/measure/src/test/resources/config-test-accuracy.json
@@ -0,0 +1,56 @@
+{
+  "name": "accu_batch_test",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "src",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }, {
+      "name": "tgt",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_target.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "rule": "src.user_id = tgt.user_id AND upper(src.first_name) = upper(tgt.first_name) AND src.last_name = tgt.last_name AND src.address = tgt.address AND src.email = tgt.email AND src.phone = tgt.phone AND src.post_code = tgt.post_code",
+        "details": {
+          "source": "src",
+          "target": "tgt",
+          "miss.records": {
+            "name": "miss.records",
+            "persist.type": "record"
+          },
+          "accuracy": {
+            "name": "accu",
+            "persist.type": "metric"
+          },
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-profiling-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test-profiling-streaming.json b/measure/src/test/resources/config-test-profiling-streaming.json
new file mode 100644
index 0000000..b2a74b8
--- /dev/null
+++ b/measure/src/test/resources/config-test-profiling-streaming.json
@@ -0,0 +1,68 @@
+{
+  "name": "prof_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "persist.type": "cache",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"]
+      }
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "rule": "source.name.count(), source.age.avg(), source.age.max(), source.age.min() group by source.name",
+        "details": {
+          "source": "source",
+          "profiling": {
+            "name": "prof",
+            "persist.type": "metric"
+          }
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-profiling.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test-profiling.json b/measure/src/test/resources/config-test-profiling.json
new file mode 100644
index 0000000..187e88a
--- /dev/null
+++ b/measure/src/test/resources/config-test-profiling.json
@@ -0,0 +1,37 @@
+{
+  "name": "prof_batch_test",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "rule": "user_id as id, user_id.approx_count_distinct() as cnt group by user_id order by cnt desc, id desc limit 3",
+        "details": {
+          "source": "source",
+          "profiling": {
+            "name": "count",
+            "persist.type": "metric"
+          }
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test.json b/measure/src/test/resources/config-test.json
new file mode 100644
index 0000000..23eb5ff
--- /dev/null
+++ b/measure/src/test/resources/config-test.json
@@ -0,0 +1,55 @@
+{
+  "name": "accu batch test",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_target.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "spark-sql",
+        "name": "miss.records",
+        "rule": "SELECT source.user_id, source.first_name, source.last_name, source.address, source.email, source.phone, source.post_code FROM source LEFT JOIN target ON coalesce(source.user_id, 'null') = coalesce(target.user_id, 'null') AND coalesce(source.first_name, 'null') = coalesce(target.first_name, 'null') AND coalesce(source.last_name, 'null') = coalesce(target.last_name, 'null') AND coalesce(source.address, 'null') = coalesce(target.address, 'null') AND coalesce(source.email, 'null') = coalesce(target.email, 'null') AND coalesce(source.phone, 'null') = coalesce(target.phone, 'null') AND coalesce(source.post_code, 'null') = coalesce(target.post_code, 'null') WHERE (NOT (source.user_id IS NULL AND source.first_name IS NULL AND source.last_name IS NULL AND source.address IS NULL AND source.email IS NULL AND source.phone IS NULL AND source.post_code IS NULL)) AND (target.user_id IS NULL AND target.first_name IS NULL AND target.last_name IS NULL AND target.address IS NULL AND t
 arget.email IS NULL AND target.phone IS NULL AND target.post_code IS NULL)",
+        "persist.type": "record"
+      }, {
+        "dsl.type": "spark-sql",
+        "name": "miss",
+        "rule": "SELECT COUNT(*) AS `miss` FROM `miss.records`",
+      }, {
+        "dsl.type": "spark-sql",
+        "name": "total",
+        "rule": "SELECT COUNT(*) AS `total` FROM source",
+      }, {
+        "dsl.type": "spark-sql",
+        "name": "accuracy",
+        "rule": "SELECT `total`.`total` AS `total`, `miss`.`miss` AS `miss`, (`total`.`total` - `miss`.`miss`) AS `matched` FROM total JOIN miss",
+        "persist.type": "metric"
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test1.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test1.json b/measure/src/test/resources/config-test1.json
new file mode 100644
index 0000000..53a8765
--- /dev/null
+++ b/measure/src/test/resources/config-test1.json
@@ -0,0 +1,96 @@
+{
+  "name": "accu batch test",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "hive",
+          "version": "1.2",
+          "config": {
+            "database": "default",
+            "table.name": "src"
+          }
+        }
+      ]
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "hive",
+          "version": "1.2",
+          "config": {
+            "database": "default",
+            "table.name": "tgt"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "df-opr",
+        "name": "source",
+        "rule": "from_json",
+        "details": {
+          "df.name": "source"
+        }
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "seeds",
+        "rule": "SELECT explode(seeds) as seed FROM source"
+      },
+      {
+        "dsl.type": "df-opr",
+        "name": "seeds",
+        "rule": "from_json",
+        "details": {
+          "df.name": "seeds",
+          "col.name": "seed"
+        }
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "source",
+        "rule": "SELECT url, get_json_object(metadata, '$.tracker.crawlRequestCreateTS') AS ts FROM seeds"
+      },
+      {
+        "dsl.type": "spark-opr",
+        "name": "target",
+        "rule": "from_json(target.value)"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "attrs",
+        "rule": "SELECT groups[0].attrsList AS attrs FROM target"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "target",
+        "rule": "SELECT attrs.values[index_of(attrs.name, 'URL')][0] AS url, get_json_object(attrs.values[index_of(attrs.name, 'CRAWLMETADATA')][0], '$.tracker.crawlRequestCreateTS') AS ts FROM df2"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "miss.record",
+        "rule": "SELECT source.url, source.ts FROM source LEFT JOIN target ON coalesce(source.url, '') = coalesce(target.url, '') AND coalesce(source.ts, '') = coalesce(target.ts, '') WHERE (NOT (source.url IS NULL AND source.ts IS NULL)) AND (target.url IS NULL AND target.ts IS NULL)",
+        "persist.type": "record"
+      }, {
+        "dsl.type": "spark-sql",
+        "name": "miss.count",
+        "rule": "SELECT COUNT(*) AS `miss.count` FROM `miss.record`",
+        "persist.type": "metric"
+      }, {
+        "dsl.type": "spark-sql",
+        "name": "total.count",
+        "rule": "SELECT COUNT(*) AS `total.count` FROM source",
+        "persist.type": "metric"
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config.json b/measure/src/test/resources/config.json
index 08a6021..0a17474 100644
--- a/measure/src/test/resources/config.json
+++ b/measure/src/test/resources/config.json
@@ -22,6 +22,6 @@
 
   "evaluateRule": {
     "sampleRatio": 1,
-    "rules": "$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code WHEN $source.user_id > 10015"
+    "rules": "$source.user_id = $target.user_id AND $source.first_name = $target.first_name AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/env-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-streaming.json b/measure/src/test/resources/env-streaming.json
index 42b4aa9..a01348f 100644
--- a/measure/src/test/resources/env-streaming.json
+++ b/measure/src/test/resources/env-streaming.json
@@ -5,6 +5,7 @@
     "batch.interval": "2s",
     "process.interval": "10s",
     "config": {
+      "spark.master": "local[*]",
       "spark.task.maxFailures": 5,
       "spark.streaming.kafkaMaxRatePerPartition": 1000,
       "spark.streaming.concurrentJobs": 4,

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/env-test.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-test.json b/measure/src/test/resources/env-test.json
new file mode 100644
index 0000000..603fad8
--- /dev/null
+++ b/measure/src/test/resources/env-test.json
@@ -0,0 +1,38 @@
+{
+  "spark": {
+    "log.level": "WARN",
+    "checkpoint.dir": "hdfs:///griffin/batch/cp",
+    "batch.interval": "10s",
+    "process.interval": "10m",
+    "config": {
+      "spark.master": "local[*]"
+    }
+  },
+
+  "persist": [
+    {
+      "type": "log",
+      "config": {
+        "max.log.lines": 100
+      }
+    }
+  ],
+
+  "info.cache": [
+    {
+      "type": "zk",
+      "config": {
+        "hosts": "localhost:2181",
+        "namespace": "griffin/infocache",
+        "lock.path": "lock",
+        "mode": "persist",
+        "init.clear": true,
+        "close.clear": false
+      }
+    }
+  ],
+
+  "cleaner": {
+
+  }
+}
\ No newline at end of file