You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/06/02 09:31:52 UTC

[3/6] incubator-griffin git commit: griffin-measure package modification

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
new file mode 100644
index 0000000..3480c1f
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
@@ -0,0 +1,27 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.params.user
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class DataConnectorParam( @JsonProperty("type") conType: String,
+                               @JsonProperty("version") version: String,
+                               @JsonProperty("config") config: Map[String, Any]
+                             ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
new file mode 100644
index 0000000..aff113b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
@@ -0,0 +1,26 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.params.user
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class EvaluateRuleParam( @JsonProperty("sampleRatio") sampleRatio: Double,
+                              @JsonProperty("rules") rules: String
+                            ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
new file mode 100644
index 0000000..b116361
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
@@ -0,0 +1,29 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.params.user
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class UserParam(@JsonProperty("name") name: String,
+                     @JsonProperty("type") dqType: String,
+                     @JsonProperty("source") sourceParam: DataConnectorParam,
+                     @JsonProperty("target") targetParam: DataConnectorParam,
+                     @JsonProperty("evaluateRule") evaluateRuleParam: EvaluateRuleParam
+                    ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
new file mode 100644
index 0000000..94b3352
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
@@ -0,0 +1,34 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.griffin.measure.batch.config.params.Param
+import org.apache.griffin.measure.batch.utils.JsonUtil
+
+import scala.util.Try
+
+case class ParamFileReader(file: String) extends ParamReader {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
+    Try {
+      val source = scala.io.Source.fromFile(file)
+      val lines = source.mkString
+      val param = JsonUtil.fromJson[T](lines)
+      source.close
+      param
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
new file mode 100644
index 0000000..2755e13
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
@@ -0,0 +1,34 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.griffin.measure.batch.config.params.Param
+import org.apache.griffin.measure.batch.utils.JsonUtil
+import org.apache.griffin.measure.batch.utils.HdfsUtil
+
+import scala.util.Try
+
+case class ParamHdfsFileReader(filePath: String) extends ParamReader {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
+    Try {
+      val source = HdfsUtil.openFile(filePath)
+      val param = JsonUtil.fromJson[T](source)
+      source.close
+      param
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
new file mode 100644
index 0000000..0aef699
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
@@ -0,0 +1,31 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.griffin.measure.batch.config.params.Param
+import org.apache.griffin.measure.batch.utils.JsonUtil
+
+import scala.util.Try
+
+case class ParamRawStringReader(rawString: String) extends ParamReader {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
+    Try {
+      val param = JsonUtil.fromJson[T](rawString)
+      param
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
new file mode 100644
index 0000000..9e9366d
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
@@ -0,0 +1,26 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.griffin.measure.batch.log.Loggable
+import org.apache.griffin.measure.batch.config.params.Param
+
+import scala.util.Try
+
+trait ParamReader extends Loggable with Serializable {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
new file mode 100644
index 0000000..bfa6a68
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
@@ -0,0 +1,36 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+
+object ParamReaderFactory {
+
+  val RawStringRegex = """^(?i)raw$""".r
+  val LocalFsRegex = """^(?i)local$""".r
+  val HdfsFsRegex = """^(?i)hdfs$""".r
+
+  def getParamReader(filePath: String, fsType: String): ParamReader = {
+    fsType match {
+      case RawStringRegex() => ParamRawStringReader(filePath)
+      case LocalFsRegex() => ParamFileReader(filePath)
+      case HdfsFsRegex() => ParamHdfsFileReader(filePath)
+      case _ => ParamHdfsFileReader(filePath)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
new file mode 100644
index 0000000..a89d9f1
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
@@ -0,0 +1,30 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.validator
+
+import org.apache.griffin.measure.batch.config.params.Param
+
+import scala.util.Try
+
+// need to validate params
+case class AllParamValidator() extends ParamValidator {
+
+  def validate[T <: Param](param: Param): Try[Boolean] = {
+    Try {
+      param.validate
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
new file mode 100644
index 0000000..8a78f08
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
@@ -0,0 +1,26 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.config.validator
+
+import org.apache.griffin.measure.batch.log.Loggable
+import org.apache.griffin.measure.batch.config.params.Param
+
+import scala.util.Try
+
+trait ParamValidator extends Loggable with Serializable {
+
+  def validate[T <: Param](param: Param): Try[Boolean]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
new file mode 100644
index 0000000..3835c9d
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
@@ -0,0 +1,105 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import com.databricks.spark.avro._
+
+import scala.util.{Success, Try}
+import java.nio.file.{Files, Paths}
+
+import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleExprs}
+import org.apache.griffin.measure.batch.utils.HdfsUtil
+
+// data connector for avro file
+case class AvroDataConnector(sqlContext: SQLContext, config: Map[String, Any],
+                             ruleExprs: RuleExprs, constFinalExprValueMap: Map[String, Any]
+                            ) extends DataConnector {
+
+  val FilePath = "file.path"
+  val FileName = "file.name"
+
+  val filePath = config.getOrElse(FilePath, "").toString
+  val fileName = config.getOrElse(FileName, "").toString
+
+  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
+
+  private def pathPrefix(): Boolean = {
+    filePath.nonEmpty
+  }
+
+  private def fileExist(): Boolean = {
+    HdfsUtil.existPath(concreteFileFullPath)
+  }
+
+  def available(): Boolean = {
+    (!concreteFileFullPath.isEmpty) && fileExist
+  }
+
+  def metaData(): Try[Iterable[(String, String)]] = {
+    Try {
+      val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+      st.fields.map(f => (f.name, f.dataType.typeName))
+    }
+  }
+
+  def data(): Try[RDD[(Product, Map[String, Any])]] = {
+    Try {
+      loadDataFile.flatMap { row =>
+        // generate cache data
+        val cacheExprValueMap: Map[String, Any] = ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
+          ExprValueUtil.genExprValueMap(Some(row), expr, cachedMap)
+        }
+        val finalExprValueMap = ExprValueUtil.updateExprValueMap(ruleExprs.finalCacheExprs, cacheExprValueMap)
+
+        // when clause filter data source
+        val whenResult = ruleExprs.whenClauseExprOpt match {
+          case Some(whenClause) => whenClause.calculate(finalExprValueMap)
+          case _ => None
+        }
+
+        // get groupby data
+        whenResult match {
+          case Some(false) => None
+          case _ => {
+            val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
+              expr.calculate(finalExprValueMap) match {
+                case Some(v) => Some(v.asInstanceOf[AnyRef])
+                case _ => None
+              }
+            }
+            val key = toTuple(groupbyData)
+
+            Some((key, finalExprValueMap))
+          }
+        }
+      }
+    }
+  }
+
+  private def loadDataFile() = {
+    sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
+  }
+
+  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+    if (as.size > 0) {
+      val tupleClass = Class.forName("scala.Tuple" + as.size)
+      tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
+    } else None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
new file mode 100644
index 0000000..7c7cb8e
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
@@ -0,0 +1,30 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+
+trait DataConnector extends Serializable {
+
+  def available(): Boolean
+
+  def metaData(): Try[Iterable[(String, String)]]
+
+  def data(): Try[RDD[(Product, Map[String, Any])]]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
new file mode 100644
index 0000000..b586974
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
@@ -0,0 +1,45 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.griffin.measure.batch.config.params.user._
+import org.apache.griffin.measure.batch.rule.RuleExprs
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.sql.SQLContext
+
+import scala.util.Try
+
+object DataConnectorFactory {
+
+  val HiveRegex = """^(?i)hive$""".r
+  val AvroRegex = """^(?i)avro$""".r
+
+  def getDataConnector(sqlContext: SQLContext,
+                       dataConnectorParam: DataConnectorParam,
+                       ruleExprs: RuleExprs,
+                       globalFinalCacheMap: Map[String, Any]
+                      ): Try[DataConnector] = {
+    val conType = dataConnectorParam.conType
+    val version = dataConnectorParam.version
+    Try {
+      conType match {
+        case HiveRegex() => HiveDataConnector(sqlContext, dataConnectorParam.config, ruleExprs, globalFinalCacheMap)
+        case AvroRegex() => AvroDataConnector(sqlContext, dataConnectorParam.config, ruleExprs, globalFinalCacheMap)
+        case _ => throw new Exception("connector creation error!")
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
new file mode 100644
index 0000000..de4cffb
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
@@ -0,0 +1,127 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleExprs}
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+
+import scala.util.{Success, Try}
+
+// data connector for hive
+case class HiveDataConnector(sqlContext: SQLContext, config: Map[String, Any],
+                             ruleExprs: RuleExprs, constFinalExprValueMap: Map[String, Any]
+                            ) extends DataConnector {
+
+  val Database = "database"
+  val TableName = "table.name"
+  val Partitions = "partitions"
+
+  val database = config.getOrElse(Database, "").toString
+  val tableName = config.getOrElse(TableName, "").toString
+  val partitionsString = config.getOrElse(Partitions, "").toString
+
+  val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName
+  val partitions = partitionsString.split(";").map(s => s.split(",").map(_.trim))
+
+  private def dbPrefix(): Boolean = {
+    database.nonEmpty && !database.equals("default")
+  }
+
+  def available(): Boolean = {
+    (!tableName.isEmpty) && {
+      Try {
+        if (dbPrefix) {
+          sqlContext.tables(database).filter(tableExistsSql).collect.size
+        } else {
+          sqlContext.tables().filter(tableExistsSql).collect.size
+        }
+      } match {
+        case Success(s) => s > 0
+        case _ => false
+      }
+    }
+  }
+
+  def metaData(): Try[Iterable[(String, String)]] = {
+    Try {
+      val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), r.getString(1))).collect
+      val partitionPos: Int = originRows.indexWhere(pair => pair._1.startsWith("# "))
+      if (partitionPos < 0) originRows
+      else originRows.take(partitionPos)
+    }
+  }
+
+  def data(): Try[RDD[(Product, Map[String, Any])]] = {
+    Try {
+      sqlContext.sql(dataSql).flatMap { row =>
+        // generate cache data
+        val cacheExprValueMap: Map[String, Any] = ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
+          ExprValueUtil.genExprValueMap(Some(row), expr, cachedMap)
+        }
+        val finalExprValueMap = ExprValueUtil.updateExprValueMap(ruleExprs.finalCacheExprs, cacheExprValueMap)
+
+        // when clause filter data source
+        val whenResult = ruleExprs.whenClauseExprOpt match {
+          case Some(whenClause) => whenClause.calculate(finalExprValueMap)
+          case _ => None
+        }
+
+        // get groupby data
+        whenResult match {
+          case Some(false) => None
+          case _ => {
+            val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
+              expr.calculate(finalExprValueMap) match {
+                case Some(v) => Some(v.asInstanceOf[AnyRef])
+                case _ => None
+              }
+            }
+            val key = toTuple(groupbyData)
+
+            Some((key, finalExprValueMap))
+          }
+        }
+      }
+    }
+  }
+
+  private def tableExistsSql(): String = {
+//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but not work for spark sql
+    s"tableName LIKE '${tableName}'"
+  }
+
+  private def metaDataSql(): String = {
+    s"DESCRIBE ${concreteTableName}"
+  }
+
+  private def dataSql(): String = {
+    val clauses = partitions.map { prtn =>
+      val cls = prtn.mkString(" AND ")
+      if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}"
+      else s"SELECT * FROM ${concreteTableName} WHERE ${cls}"
+    }
+    clauses.mkString(" UNION ALL ")
+  }
+
+  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+    if (as.size > 0) {
+      val tupleClass = Class.forName("scala.Tuple" + as.size)
+      tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
+    } else None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
new file mode 100644
index 0000000..02f1a99
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
@@ -0,0 +1,39 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.log
+
+import org.slf4j.LoggerFactory
+
+trait Loggable {
+
+  @transient private lazy val logger = LoggerFactory.getLogger(getClass)
+
+  protected def info(msg: String): Unit = {
+    logger.info(msg)
+  }
+
+  protected def debug(msg: String): Unit = {
+    logger.debug(msg)
+  }
+
+  protected def warn(msg: String): Unit = {
+    logger.warn(msg)
+  }
+
+  protected def error(msg: String): Unit = {
+    logger.error(msg)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
new file mode 100644
index 0000000..7a6a94b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
@@ -0,0 +1,167 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.persist
+
+import java.util.Date
+
+import org.apache.griffin.measure.batch.result._
+import org.apache.griffin.measure.batch.utils.HdfsUtil
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+// persist result and data to hdfs
+case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist {
+
+  val Path = "path"
+  val MaxPersistLines = "max.persist.lines"
+  val MaxLinesPerFile = "max.lines.per.file"
+
+  val path = config.getOrElse(Path, "").toString
+  val maxPersistLines = try { config.getOrElse(MaxPersistLines, -1).toString.toInt } catch { case _ => -1 }
+  val maxLinesPerFile = try { config.getOrElse(MaxLinesPerFile, 10000).toString.toLong } catch { case _ => 10000 }
+
+  val separator = "/"
+
+  val StartFile = filePath("_START")
+  val FinishFile = filePath("_FINISH")
+  val ResultFile = filePath("_RESULT")
+
+  val MissRecFile = filePath("_MISSREC")      // optional
+  val MatchRecFile = filePath("_MATCHREC")    // optional
+
+  val LogFile = filePath("_LOG")
+
+  var _init = true
+  private def isInit = {
+    val i = _init
+    _init = false
+    i
+  }
+
+  def available(): Boolean = {
+    (path.nonEmpty) && (maxPersistLines < Int.MaxValue)
+  }
+
+  private def persistHead: String = {
+    val dt = new Date(timeStamp)
+    s"================ log of ${dt} ================\n"
+  }
+
+  private def timeHead(rt: Long): String = {
+    val dt = new Date(rt)
+    s"--- ${dt} ---\n"
+  }
+
+  protected def getFilePath(parentPath: String, fileName: String): String = {
+    if (parentPath.endsWith(separator)) parentPath + fileName else parentPath + separator + fileName
+  }
+
+  protected def filePath(file: String): String = {
+    getFilePath(path, s"${metricName}/${timeStamp}/${file}")
+  }
+
+  protected def withSuffix(path: String, suffix: String): String = {
+    s"${path}.${suffix}"
+  }
+
+  def start(msg: String): Unit = {
+    try {
+      HdfsUtil.writeContent(StartFile, msg)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+  def finish(): Unit = {
+    try {
+      HdfsUtil.createEmptyFile(FinishFile)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  def result(rt: Long, result: Result): Unit = {
+    try {
+      val resStr = result match {
+        case ar: AccuracyResult => {
+          s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
+        }
+        case pr: ProfileResult => {
+          s"match percentage: ${pr.matchPercentage}\ntotal count: ${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
+        }
+        case _ => {
+          s"result: ${result}"
+        }
+      }
+      HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr)
+      log(rt, resStr)
+
+      info(resStr)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  // need to avoid string too long
+  private def rddRecords(records: RDD[String], path: String): Unit = {
+    try {
+      val recordCount = records.count
+      val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
+      if (count > 0) {
+        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
+        if (groupCount <= 1) {
+          val recs = records.take(count.toInt)
+          persistRecords(path, recs)
+        } else {
+          val groupedRecords: RDD[(Long, Iterable[String])] =
+            records.zipWithIndex.flatMap { r =>
+              val gid = r._2 / maxLinesPerFile
+              if (gid < groupCount) Some((gid, r._1)) else None
+            }.groupByKey()
+          groupedRecords.foreach { group =>
+            val (gid, recs) = group
+            val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString)
+            persistRecords(hdfsPath, recs)
+          }
+        }
+      }
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  def missRecords(records: RDD[String]): Unit = {
+    rddRecords(records, MissRecFile)
+  }
+
+  def matchRecords(records: RDD[String]): Unit = {
+    rddRecords(records, MatchRecFile)
+  }
+
+  private def persistRecords(hdfsPath: String, records: Iterable[String]): Unit = {
+    val recStr = records.mkString("\n")
+    HdfsUtil.appendContent(hdfsPath, recStr)
+  }
+
+  def log(rt: Long, msg: String): Unit = {
+    try {
+      val logStr = (if (isInit) persistHead else "") + timeHead(rt) + s"${msg}\n\n"
+      HdfsUtil.appendContent(LogFile, logStr)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
new file mode 100644
index 0000000..3223248
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
@@ -0,0 +1,74 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.persist
+
+import org.apache.griffin.measure.batch.result._
+import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil}
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+// persist result by http way
+case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist {
+
+  val Api = "api"
+  val Method = "method"
+
+  val api = config.getOrElse(Api, "").toString
+  val method = config.getOrElse(Method, "post").toString
+
+  def available(): Boolean = {
+    api.nonEmpty
+  }
+
+  def start(msg: String): Unit = {}
+  def finish(): Unit = {}
+
+  def result(rt: Long, result: Result): Unit = {
+    result match {
+      case ar: AccuracyResult => {
+        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch))
+        httpResult(dataMap)
+      }
+      case pr: ProfileResult => {
+        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch))
+        httpResult(dataMap)
+      }
+      case _ => {
+        info(s"result: ${result}")
+      }
+    }
+  }
+
+  private def httpResult(dataMap: Map[String, Any]) = {
+    try {
+      val data = JsonUtil.toJson(dataMap)
+      // post
+      val params = Map[String, Object]()
+      val header = Map[String, Object]()
+      val status = HttpUtil.httpRequest(api, method, params, header, data)
+      info(s"${method} to ${api} response status: ${status}")
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+
+  }
+
+  def missRecords(records: RDD[String]): Unit = {}
+  def matchRecords(records: RDD[String]): Unit = {}
+
+  def log(rt: Long, msg: String): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
new file mode 100644
index 0000000..ce07eca
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
@@ -0,0 +1,45 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.persist
+
+import org.apache.griffin.measure.batch.result._
+import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil}
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+// persist result and data by multiple persists
+case class MultiPersists(persists: Iterable[Persist]) extends Persist {
+
+  val timeStamp: Long = persists match {
+    case Nil => 0
+    case _ => persists.head.timeStamp
+  }
+
+  val config: Map[String, Any] = Map[String, Any]()
+
+  def available(): Boolean = { persists.exists(_.available()) }
+
+  def start(msg: String): Unit = { persists.foreach(_.start(msg)) }
+  def finish(): Unit = { persists.foreach(_.finish()) }
+
+  def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, result)) }
+
+  def missRecords(records: RDD[String]): Unit = { persists.foreach(_.missRecords(records)) }
+  def matchRecords(records: RDD[String]): Unit = { persists.foreach(_.matchRecords(records)) }
+
+  def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
new file mode 100644
index 0000000..77ee61f
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
@@ -0,0 +1,40 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.persist
+
+import org.apache.griffin.measure.batch.log.Loggable
+import org.apache.griffin.measure.batch.result._
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+
+trait Persist extends Loggable with Serializable {
+  val timeStamp: Long
+
+  val config: Map[String, Any]
+
+  def available(): Boolean
+
+  def start(msg: String): Unit
+  def finish(): Unit
+
+  def result(rt: Long, result: Result): Unit
+
+  def missRecords(records: RDD[String]): Unit
+  def matchRecords(records: RDD[String]): Unit
+
+  def log(rt: Long, msg: String): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
new file mode 100644
index 0000000..7f48b2d
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
@@ -0,0 +1,45 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.persist
+
+import org.apache.griffin.measure.batch.config.params.env._
+
+import scala.util.{Success, Try}
+
+
+case class PersistFactory(persistParams: Iterable[PersistParam], metricName: String) extends Serializable {
+
+  val HDFS_REGEX = """^(?i)hdfs$""".r
+  val HTTP_REGEX = """^(?i)http$""".r
+
+  def getPersists(timeStamp: Long): MultiPersists = {
+    MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param)))
+  }
+
+  // get the persists configured
+  private def getPersist(timeStamp: Long, persistParam: PersistParam): Option[Persist] = {
+    val persistConfig = persistParam.config
+    val persistTry = persistParam.persistType match {
+      case HDFS_REGEX() => Try(HdfsPersist(persistConfig, metricName, timeStamp))
+      case HTTP_REGEX() => Try(HttpPersist(persistConfig, metricName, timeStamp))
+      case _ => throw new Exception("not supported persist type")
+    }
+    persistTry match {
+      case Success(persist) if (persist.available) => Some(persist)
+      case _ => None
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
new file mode 100644
index 0000000..3ccb140
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
@@ -0,0 +1,40 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.result
+
+// result for accuracy: miss count, total count
+case class AccuracyResult(miss: Long, total: Long) extends Result {
+
+  type T = AccuracyResult
+
+  def update(delta: T): T = {
+    AccuracyResult(delta.miss, total)
+  }
+
+  def eventual(): Boolean = {
+    this.miss <= 0
+  }
+
+  def differsFrom(other: T): Boolean = {
+    (this.miss != other.miss) || (this.total != other.total)
+  }
+
+  def getMiss = miss
+  def getTotal = total
+  def getMatch = total - miss
+
+  def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala
new file mode 100644
index 0000000..41a0639
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/result/ProfileResult.scala
@@ -0,0 +1,40 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.result
+
+// result for profile: match count, total count
+case class ProfileResult(matchCount: Long, totalCount: Long) extends Result {
+
+  type T = ProfileResult
+
+  def update(delta: T): T = {
+    ProfileResult(matchCount + delta.matchCount, totalCount)
+  }
+
+  def eventual(): Boolean = {
+    this.matchCount >= totalCount
+  }
+
+  def differsFrom(other: T): Boolean = {
+    (this.matchCount != other.matchCount) || (this.totalCount != other.totalCount)
+  }
+
+  def getMiss = totalCount - matchCount
+  def getTotal = totalCount
+  def getMatch = matchCount
+
+  def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
new file mode 100644
index 0000000..d0df265
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
@@ -0,0 +1,28 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.result
+
+
+trait Result extends Serializable {
+
+  type T <: Result
+
+  def update(delta: T): T
+
+  def eventual(): Boolean
+
+  def differsFrom(other: T): Boolean
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
new file mode 100644
index 0000000..731190d
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
@@ -0,0 +1,53 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.result
+
+
+sealed trait ResultInfo {
+  type T
+  val key: String
+  val tp: String
+  def wrap(value: T) = (key -> value)
+}
+
+final case object TimeGroupInfo extends ResultInfo {
+  type T = Long
+  val key = "__time__"
+  val tp = "bigint"
+}
+
+final case object NextFireTimeInfo extends ResultInfo {
+  type T = Long
+  val key = "__next_fire_time__"
+  val tp = "bigint"
+}
+
+final case object MismatchInfo extends ResultInfo {
+  type T = String
+  val key = "__mismatch__"
+  val tp = "string"
+}
+
+final case object TargetInfo extends ResultInfo {
+  type T = Map[String, Any]
+  val key = "__target__"
+  val tp = "map"
+}
+
+final case object ErrorInfo extends ResultInfo {
+  type T = String
+  val key = "__error__"
+  val tp = "string"
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
new file mode 100644
index 0000000..0c26a8b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
@@ -0,0 +1,311 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule
+
+import scala.util.{Success, Try}
+
+
+object CalculationUtil {
+
+  implicit def option2CalculationValue(v: Option[_]): CalculationValue = CalculationValue(v)
+
+  // redefine the calculation method of operators in DSL
+  case class CalculationValue(value: Option[_]) extends Serializable {
+
+    def + (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 + v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def - (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 - v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 - v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 - v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def * (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2)
+          case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 * v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 * v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 * v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 * v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 * v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def / (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 / v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 / v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 / v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def % (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 % v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 % v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 % v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def unary_- (): Option[_] = {
+      value match {
+        case None => None
+        case Some(null) => None
+        case Some(v: String) => Some(v.reverse.toString)
+        case Some(v: Boolean) => Some(!v)
+        case Some(v: Byte) => Some(-v)
+        case Some(v: Short) => Some(-v)
+        case Some(v: Int) => Some(-v)
+        case Some(v: Long) => Some(-v)
+        case Some(v: Float) => Some(-v)
+        case Some(v: Double) => Some(-v)
+        case Some(v) => Some(v)
+        case _ => None
+      }
+    }
+
+
+    def === (other: Option[_]): Option[Boolean] = {
+      (value, other) match {
+        case (None, None) => Some(true)
+        case (Some(v1), Some(v2)) => Some(v1 == v2)
+        case _ => Some(false)
+      }
+    }
+
+    def =!= (other: Option[_]): Option[Boolean] = {
+      (value, other) match {
+        case (None, None) => Some(false)
+        case (Some(v1), Some(v2)) => Some(v1 != v2)
+        case _ => Some(true)
+      }
+    }
+
+    def > (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: String), Some(v2: String)) => Some(v1 > v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def >= (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (None, None) | (Some(null), Some(null)) => Some(true)
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def < (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: String), Some(v2: String)) => Some(v1 < v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def <= (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (None, None) | (Some(null), Some(null)) => Some(true)
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+
+    def in (other: Iterable[Option[_]]): Option[Boolean] = {
+      other.foldLeft(Some(false): Option[Boolean]) { (res, next) =>
+        optOr(res, ===(next))
+      }
+    }
+
+    def not_in (other: Iterable[Option[_]]): Option[Boolean] = {
+      other.foldLeft(Some(true): Option[Boolean]) { (res, next) =>
+        optAnd(res, =!=(next))
+      }
+    }
+
+    def between (other: Iterable[Option[_]]): Option[Boolean] = {
+      if (other.size < 2) None else {
+        val (begin, end) = (other.head, other.tail.head)
+        if (begin.isEmpty && end.isEmpty) Some(value.isEmpty)
+        else optAnd(>=(begin), <=(end))
+      }
+    }
+
+    def not_between (other: Iterable[Option[_]]): Option[Boolean] = {
+      if (other.size < 2) None else {
+        val (begin, end) = (other.head, other.tail.head)
+        if (begin.isEmpty && end.isEmpty) Some(value.nonEmpty)
+        else optOr(<(begin), >(end))
+      }
+    }
+
+    def unary_! (): Option[Boolean] = {
+      optNot(value)
+    }
+
+    def && (other: Option[_]): Option[Boolean] = {
+      optAnd(value, other)
+    }
+
+    def || (other: Option[_]): Option[Boolean] = {
+      optOr(value, other)
+    }
+
+
+    private def optNot(a: Option[_]): Option[Boolean] = {
+      a match {
+        case None => None
+        case Some(null) => None
+        case Some(v: Boolean) => Some(!v)
+        case _ => None
+      }
+    }
+    private def optAnd(a: Option[_], b: Option[_]): Option[Boolean] = {
+      (a, b) match {
+        case (None, _) | (_, None) => None
+        case (Some(null), _) | (_, Some(null)) => None
+        case (Some(false), _) | (_, Some(false)) => Some(false)
+        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2)
+        case _ => None
+      }
+    }
+    private def optOr(a: Option[_], b: Option[_]): Option[Boolean] = {
+      (a, b) match {
+        case (None, _) | (_, None) => None
+        case (Some(null), _) | (_, Some(null)) => None
+        case (Some(true), _) | (_, Some(true)) => Some(true)
+        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2)
+        case _ => None
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/ExprValueUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/ExprValueUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/ExprValueUtil.scala
new file mode 100644
index 0000000..d3c1b5b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/ExprValueUtil.scala
@@ -0,0 +1,89 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.sql.Row
+
+import scala.util.{Success, Try}
+
+object ExprValueUtil {
+
+  // from origin data such as a Row of DataFrame, with existed expr value map, calculate related expression, get the expression value
+  // for now, one expr only get one value, not supporting one expr get multiple values
+  // params:
+  // - originData: the origin data such as a Row of DataFrame
+  // - expr: the expression to be calculated
+  // - existExprValueMap: existed expression value map, which might be used to get some existed expression value during calculation
+  // output: the calculated expression value
+  private def calcExprValue(originData: Option[Any], expr: Expr, existExprValueMap: Map[String, Any]): Option[Any] = {
+    Try {
+      expr match {
+        case selection: SelectionExpr => {
+          selection.selectors.foldLeft(originData) { (dt, selector) =>
+            calcExprValue(dt, selector, existExprValueMap)
+          }
+        }
+        case selector: IndexFieldRangeSelectExpr => {
+          originData match {
+            case Some(row: Row) => {
+              if (selector.fields.size == 1) {
+                selector.fields.head match {
+                  case i: IndexDesc => Some(row.getAs[Any](i.index))
+                  case f: FieldDesc => Some(row.getAs[Any](f.field))
+                  case _ => None
+                }
+              } else None
+            }
+            case _ => None
+          }
+        }
+        case _ => expr.calculate(existExprValueMap)
+      }
+    } match {
+      case Success(v) => v
+      case _ => None
+    }
+  }
+
+  // try to calculate expr from data and initExprValueMap, generate a new expression value map
+  // depends on origin data and existed expr value map
+  def genExprValueMap(data: Option[Any], expr: Expr, initExprValueMap: Map[String, Any]): Map[String, Any] = {
+    val valueOpt = calcExprValue(data, expr, initExprValueMap)
+    if (valueOpt.nonEmpty) {
+      initExprValueMap + (expr._id -> valueOpt.get)
+    } else initExprValueMap
+  }
+
+  // try to calculate some exprs from data and initExprValueMap, generate a new expression value map
+  // depends on origin data and existed expr value map
+  def genExprValueMap(data: Option[Any], exprs: Iterable[Expr], initExprValueMap: Map[String, Any]): Map[String, Any] = {
+    exprs.foldLeft(initExprValueMap) { (evMap, expr) =>
+      ExprValueUtil.genExprValueMap(None, expr, evMap)
+    }
+  }
+
+  // with exprValueMap, calculate expressions, update the expression value map
+  // only depends on existed expr value map, only calculation, not need origin data
+  def updateExprValueMap(exprs: Iterable[Expr], exprValueMap: Map[String, Any]): Map[String, Any] = {
+    exprs.foldLeft(Map[String, Any]()) { (evMap, expr) =>
+      val valueOpt = expr.calculate(exprValueMap)
+      if (valueOpt.nonEmpty) {
+        evMap + (expr._id -> valueOpt.get)
+      } else evMap
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
new file mode 100644
index 0000000..a4b478f
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
@@ -0,0 +1,74 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.rule.expr._
+
+case class RuleAnalyzer(rule: StatementExpr) extends Serializable {
+
+  val constData = ""
+  private val SourceData = "source"
+  private val TargetData = "target"
+
+  val constCacheExprs: Iterable[Expr] = rule.getCacheExprs(constData)
+  private val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData)
+  private val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData)
+
+  private val sourcePersistExprs: Iterable[Expr] = rule.getPersistExprs(SourceData)
+  private val targetPersistExprs: Iterable[Expr] = rule.getPersistExprs(TargetData)
+
+  val constFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(constData).toSet
+  private val sourceFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(SourceData).toSet ++ sourcePersistExprs.toSet
+  private val targetFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(TargetData).toSet ++ targetPersistExprs.toSet
+
+  private val groupbyExprPairs: Seq[(Expr, Expr)] = rule.getGroupbyExprPairs((SourceData, TargetData))
+  private val sourceGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._1)
+  private val targetGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._2)
+
+  private val whenClauseExprOpt: Option[LogicalExpr] = rule.getWhenClauseExpr
+
+  val sourceRuleExprs: RuleExprs = RuleExprs(sourceGroupbyExprs, sourceCacheExprs,
+    sourceFinalCacheExprs, sourcePersistExprs, whenClauseExprOpt)
+  val targetRuleExprs: RuleExprs = RuleExprs(targetGroupbyExprs, targetCacheExprs,
+    targetFinalCacheExprs, targetPersistExprs, whenClauseExprOpt)
+
+}
+
+
+// for a single data source
+// groupbyExprs: in accuracy case, these exprs could be groupby exprs
+//                  Data keys for accuracy case, generated by the equal statements, to improve the calculation efficiency
+// cacheExprs: the exprs value could be caculated independently, and cached for later use
+//                  Cached for the finalCacheExprs calculation, it has some redundant values, saving it wastes a lot
+// finalCacheExprs: the root of cachedExprs, cached for later use, plus with persistExprs
+//                  Cached for the calculation usage, and can be saved for the re-calculation in streaming mode
+// persistExprs: the expr values should be persisted, only the direct selection exprs are persistable
+//                  Persisted for record usage, to record the missing data, need be readable as raw data
+// whenClauseExprOpt: when clause of rule, to determine if the row of data source is filtered
+//                  Can be prep-calculated to filter some data in data connector
+case class RuleExprs(groupbyExprs: Seq[Expr],
+                     cacheExprs: Iterable[Expr],
+                     finalCacheExprs: Iterable[Expr],
+                     persistExprs: Iterable[Expr],
+                     whenClauseExprOpt: Option[LogicalExpr]
+                    ) {
+  // for example: for a rule "$source.name = $target.name AND $source.age < $target.age + (3 * 4)"
+  // in this rule, for the target data source, the targetRuleExprs looks like below
+  // groupbyExprs: $target.name
+  // cacheExprs: $target.name, $target.age, $target.age + (3 * 4)
+  // finalCacheExprs: $target.name, $target.age + (3 * 4), $target.age
+  // persistExprs: $target.name, $target.age
+  // whenClauseExprOpt: None
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
new file mode 100644
index 0000000..7ba7ca4
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
@@ -0,0 +1,48 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.config.params.user._
+
+import scala.util.Failure
+//import org.apache.griffin.measure.batch.rule.expr_old._
+import org.apache.griffin.measure.batch.rule.expr._
+
+import scala.util.{Success, Try}
+
+
+case class RuleFactory(evaluateRuleParam: EvaluateRuleParam) {
+
+  val ruleParser: RuleParser = RuleParser()
+
+  def generateRule(): StatementExpr = {
+    val rules = evaluateRuleParam.rules
+    val statement = parseExpr(rules) match {
+      case Success(se) => se
+      case Failure(ex) => throw ex
+    }
+    statement
+  }
+
+  private def parseExpr(rules: String): Try[StatementExpr] = {
+    Try {
+      val result = ruleParser.parseAll(ruleParser.rule, rules)
+      if (result.successful) result.get
+      else throw new Exception("parse rule error!")
+//      throw new Exception("parse rule error!")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
new file mode 100644
index 0000000..29558db
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
@@ -0,0 +1,236 @@
+/*-
+ * Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+ */
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.rule.expr._
+
+import scala.util.parsing.combinator._
+
+case class RuleParser() extends JavaTokenParsers with Serializable {
+
+  /**
+    * BNF representation for grammar as below:
+    *
+    * <rule> ::= <logical-statement> [WHEN <logical-statement>]
+    * rule: mapping-rule [WHEN when-rule]
+    * - mapping-rule: the first level opr should better not be OR | NOT, otherwise it can't automatically find the groupby column
+    * - when-rule: only contain the general info of data source, not the special info of each data row
+    *
+    * <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")"
+    * logical-statement: return boolean value
+    * logical-operator: "AND" | "&&", "OR" | "||", "NOT" | "!"
+    *
+    * <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>)
+    * logical-expression example: $source.id = $target.id, $source.page_id IN ('3214', '4312', '60821')
+    *
+    * <compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
+    * <range-opr> ::= ["NOT"] "IN" | "BETWEEN"
+    * <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
+    * range-expr example: ('3214', '4312', '60821'), (10, 15), ()
+    *
+    * <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
+    * math-expr example: $source.price * $target.count, "hello" + " " + "world" + 123
+    *
+    * <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
+    * <unary-opr> ::= "+" | "-"
+    *
+    * <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
+    *
+    * <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+
+    * selection example: $source.price, $source.json(), $source['state'], $source.numList[3], $target.json().mails['org' = 'apache'].names[*]
+    *
+    * <selection-head> ::= $source | $target
+    *
+    * <field-sel> ::= "." <field-string>
+    *
+    * <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
+    * <function-name> ::= <name-string>
+    * <arg> ::= <math-expr>
+    *
+    * <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]"
+    * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*"
+    * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * means all items, 'age' means item 'age'
+    * <index-field> ::= <index> | <field-quote> | <all-selection>
+    * index: 0 ~ n means position from start, -1 ~ -n means position from end
+    * <field-quote> ::= ' <field-string> ' | " <field-string> "
+    *
+    * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
+    * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
+    * filter-sel example: ['name' = 'URL'], $source.man['age' > $source.graduate_age + 5 ]
+    *
+    * When <math-expr> in the selection, it mustn't contain the different <selection-head>, for example:
+    * $source.tags[1+2]             valid
+    * $source.tags[$source.first]   valid
+    * $source.tags[$target.first]   invalid
+    * -- Such job is for validation, not for parser
+    *
+    *
+    * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> | <literal-null> | <literal-none>
+    * <literal-string> ::= <any-string>
+    * <literal-number> ::= <integer> | <double>
+    * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
+    * <literal-boolean> ::= true | false
+    * <literal-null> ::= null | undefined
+    * <literal-none> ::= none
+    *
+    */
+
+  object Keyword {
+    def WhenKeywords: Parser[String] = """(?i)when""".r
+    def UnaryLogicalKeywords: Parser[String] = """(?i)not""".r
+    def BinaryLogicalKeywords: Parser[String] = """(?i)and|or""".r
+    def RangeKeywords: Parser[String] = """(?i)(not\s+)?(in|between)""".r
+    def DataSourceKeywords: Parser[String] = """(?i)\$(source|target)""".r
+    def Keywords: Parser[String] = WhenKeywords | UnaryLogicalKeywords | BinaryLogicalKeywords | RangeKeywords | DataSourceKeywords
+  }
+  import Keyword._
+
+  object Operator {
+    def NotLogicalOpr: Parser[String] = """(?i)not""".r | "!"
+    def AndLogicalOpr: Parser[String] = """(?i)and""".r | "&&"
+    def OrLogicalOpr: Parser[String] = """(?i)or""".r | "||"
+    def CompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r
+    def RangeOpr: Parser[String] = RangeKeywords
+
+    def UnaryMathOpr: Parser[String] = "+" | "-"
+    def BinaryMathOpr1: Parser[String] = "*" | "/" | "%"
+    def BinaryMathOpr2: Parser[String] = "+" | "-"
+
+    def FilterCompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r
+
+    def SqBracketPair: (Parser[String], Parser[String]) = ("[", "]")
+    def BracketPair: (Parser[String], Parser[String]) = ("(", ")")
+    def Dot: Parser[String] = "."
+    def AllSelection: Parser[String] = "*"
+    def SQuote: Parser[String] = "'"
+    def DQuote: Parser[String] = "\""
+    def Comma: Parser[String] = ","
+  }
+  import Operator._
+
+  object SomeString {
+    def AnyString: Parser[String] = """[^'\"{}\[\]()=<>.$@,;+\-*/\\]*""".r
+    def SimpleFieldString: Parser[String] = """\w+""".r
+    def FieldString: Parser[String] = """[\w\s]+""".r
+    def NameString: Parser[String] = """[a-zA-Z_]\w*""".r
+  }
+  import SomeString._
+
+  object SomeNumber {
+    def IntegerNumber: Parser[String] = """[+\-]?\d+""".r
+    def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r
+    def IndexNumber: Parser[String] = IntegerNumber
+  }
+  import SomeNumber._
+
+  // -- literal --
+  def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean | literialNull | literialNone
+  def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) }
+  def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) }
+  def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { LiteralTimeExpr(_) }
+  def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | """(?i)false""".r) ^^ { LiteralBooleanExpr(_) }
+  def literialNull: Parser[LiteralNullExpr] = ("""(?i)null""".r | """(?i)undefined""".r) ^^ { LiteralNullExpr(_) }
+  def literialNone: Parser[LiteralNoneExpr] = """(?i)none""".r ^^ { LiteralNoneExpr(_) }
+
+  // -- selection --
+  // <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+
+  def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ^^ {
+    case head ~ selectors => SelectionExpr(head, selectors)
+  }
+  def selector: Parser[SelectExpr] = (functionOperation | fieldSelect | indexFieldRangeSelect | filterSelect)
+
+  def selectionHead: Parser[SelectionHead] = DataSourceKeywords ^^ { SelectionHead(_) }
+  // <field-sel> ::= "." <field-string>
+  def fieldSelect: Parser[IndexFieldRangeSelectExpr] = Dot ~> SimpleFieldString ^^ {
+    case field => IndexFieldRangeSelectExpr(FieldDesc(field) :: Nil)
+  }
+  // <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
+  def functionOperation: Parser[FunctionOperationExpr] = Dot ~ NameString ~ BracketPair._1 ~ repsep(argument, Comma) ~ BracketPair._2 ^^ {
+    case _ ~ func ~ _ ~ args ~ _ => FunctionOperationExpr(func, args)
+  }
+  def argument: Parser[MathExpr] = mathExpr
+  // <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]"
+  def indexFieldRangeSelect: Parser[IndexFieldRangeSelectExpr] = SqBracketPair._1 ~> rep1sep(indexFieldRange, Comma) <~ SqBracketPair._2 ^^ {
+    case ifrs => IndexFieldRangeSelectExpr(ifrs)
+  }
+  // <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*"
+  def indexFieldRange: Parser[FieldDescOnly] = indexField | BracketPair._1 ~ indexField ~ Comma ~ indexField ~ BracketPair._2 ^^ {
+    case _ ~ if1 ~ _ ~ if2 ~ _ => FieldRangeDesc(if1, if2)
+  }
+  // <index-field> ::= <index> | <field-quote> | <all-selection>
+  // *here it can parse <math-expr>, but for simple situation, not supported now*
+  def indexField: Parser[FieldDescOnly] = IndexNumber ^^ { IndexDesc(_) } | fieldQuote | AllSelection ^^ { AllFieldsDesc(_) }
+  // <field-quote> ::= ' <field-string> ' | " <field-string> "
+  def fieldQuote: Parser[FieldDesc] = (SQuote ~> FieldString <~ SQuote | DQuote ~> FieldString <~ DQuote) ^^ { FieldDesc(_) }
+  // <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
+  def filterSelect: Parser[FilterSelectExpr] = SqBracketPair._1 ~> fieldQuote ~ FilterCompareOpr ~ mathExpr <~ SqBracketPair._2 ^^ {
+    case field ~ compare ~ value => FilterSelectExpr(field, compare, value)
+  }
+
+  // -- math --
+  // <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
+  def mathFactor: Parser[MathExpr] = (literal | selection | BracketPair._1 ~> mathExpr <~ BracketPair._2) ^^ { MathFactorExpr(_) }
+  // <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
+  // <unary-opr> ::= "+" | "-"
+  def unaryMathExpr: Parser[MathExpr] = rep(UnaryMathOpr) ~ mathFactor ^^ {
+    case Nil ~ a => a
+    case list ~ a => UnaryMathExpr(list, a)
+  }
+  // <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
+  def binaryMathExpr1: Parser[MathExpr] = unaryMathExpr ~ rep(BinaryMathOpr1 ~ unaryMathExpr) ^^ {
+    case a ~ Nil => a
+    case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
+  }
+  def binaryMathExpr2: Parser[MathExpr] = binaryMathExpr1 ~ rep(BinaryMathOpr2 ~ binaryMathExpr1) ^^ {
+    case a ~ Nil => a
+    case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
+  }
+  def mathExpr: Parser[MathExpr] = binaryMathExpr2
+
+  // -- logical expression --
+  // <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
+  def rangeExpr: Parser[RangeDesc] = BracketPair._1 ~> repsep(mathExpr, Comma) <~ BracketPair._2 ^^ { RangeDesc(_) }
+  // <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>)
+  def logicalExpr: Parser[LogicalExpr] = mathExpr ~ CompareOpr ~ mathExpr ^^ {
+    case left ~ opr ~ right => LogicalCompareExpr(left, opr, right)
+  } | mathExpr ~ RangeOpr ~ rangeExpr ^^ {
+    case left ~ opr ~ range => LogicalRangeExpr(left, opr, range)
+  } | mathExpr ^^ { LogicalSimpleExpr(_) }
+
+  // -- logical statement --
+  def logicalFactor: Parser[LogicalExpr] = logicalExpr | BracketPair._1 ~> logicalStatement <~ BracketPair._2
+  def notLogicalStatement: Parser[LogicalExpr] = rep(NotLogicalOpr) ~ logicalFactor ^^ {
+    case Nil ~ a => a
+    case list ~ a => UnaryLogicalExpr(list, a)
+  }
+  def andLogicalStatement: Parser[LogicalExpr] = notLogicalStatement ~ rep(AndLogicalOpr ~ notLogicalStatement) ^^ {
+    case a ~ Nil => a
+    case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
+  }
+  def orLogicalStatement: Parser[LogicalExpr] = andLogicalStatement ~ rep(OrLogicalOpr ~ andLogicalStatement) ^^ {
+    case a ~ Nil => a
+    case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
+  }
+  // <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")"
+  def logicalStatement: Parser[LogicalExpr] = orLogicalStatement
+
+  // -- rule --
+  // <rule> ::= <logical-statement> [WHEN <logical-statement>]
+  def rule: Parser[StatementExpr] = logicalStatement ~ opt(WhenKeywords ~> logicalStatement) ^^ {
+    case ls ~ Some(ws) => WhenClauseStatementExpr(ls, ws)
+    case ls ~ _ => SimpleStatementExpr(ls)
+  }
+
+}