You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/09/30 08:35:22 UTC

[07/11] incubator-griffin git commit: Dsl modify

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala
deleted file mode 100644
index c969012..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import scala.util.{Success, Try}
-
-
-object CalculationUtil {
-
-  implicit def option2CalculationValue(v: Option[_]): CalculationValue = CalculationValue(v)
-
-  // redefine the calculation method of operators in DSL
-  case class CalculationValue(value: Option[_]) extends Serializable {
-
-    def + (other: Option[_]): Option[_] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString)
-          case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte)
-          case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort)
-          case (Some(v1: Int), Some(v2)) => Some(v1 + v2.toString.toInt)
-          case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong)
-          case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat)
-          case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble)
-          case _ => value
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def - (other: Option[_]): Option[_] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte)
-          case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort)
-          case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt)
-          case (Some(v1: Long), Some(v2)) => Some(v1 - v2.toString.toLong)
-          case (Some(v1: Float), Some(v2)) => Some(v1 - v2.toString.toFloat)
-          case (Some(v1: Double), Some(v2)) => Some(v1 - v2.toString.toDouble)
-          case _ => value
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def * (other: Option[_]): Option[_] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2)
-          case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt)
-          case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte)
-          case (Some(v1: Short), Some(v2)) => Some(v1 * v2.toString.toShort)
-          case (Some(v1: Int), Some(v2)) => Some(v1 * v2.toString.toInt)
-          case (Some(v1: Long), Some(v2)) => Some(v1 * v2.toString.toLong)
-          case (Some(v1: Float), Some(v2)) => Some(v1 * v2.toString.toFloat)
-          case (Some(v1: Double), Some(v2)) => Some(v1 * v2.toString.toDouble)
-          case _ => value
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def / (other: Option[_]): Option[_] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte)
-          case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort)
-          case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt)
-          case (Some(v1: Long), Some(v2)) => Some(v1 / v2.toString.toLong)
-          case (Some(v1: Float), Some(v2)) => Some(v1 / v2.toString.toFloat)
-          case (Some(v1: Double), Some(v2)) => Some(v1 / v2.toString.toDouble)
-          case _ => value
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def % (other: Option[_]): Option[_] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte)
-          case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort)
-          case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt)
-          case (Some(v1: Long), Some(v2)) => Some(v1 % v2.toString.toLong)
-          case (Some(v1: Float), Some(v2)) => Some(v1 % v2.toString.toFloat)
-          case (Some(v1: Double), Some(v2)) => Some(v1 % v2.toString.toDouble)
-          case _ => value
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def unary_- (): Option[_] = {
-      value match {
-        case None => None
-        case Some(null) => None
-        case Some(v: String) => Some(v.reverse.toString)
-        case Some(v: Boolean) => Some(!v)
-        case Some(v: Byte) => Some(-v)
-        case Some(v: Short) => Some(-v)
-        case Some(v: Int) => Some(-v)
-        case Some(v: Long) => Some(-v)
-        case Some(v: Float) => Some(-v)
-        case Some(v: Double) => Some(-v)
-        case Some(v) => Some(v)
-        case _ => None
-      }
-    }
-
-
-    def === (other: Option[_]): Option[Boolean] = {
-      (value, other) match {
-        case (None, None) => Some(true)
-        case (Some(v1), Some(v2)) => Some(v1 == v2)
-        case _ => Some(false)
-      }
-    }
-
-    def =!= (other: Option[_]): Option[Boolean] = {
-      (value, other) match {
-        case (None, None) => Some(false)
-        case (Some(v1), Some(v2)) => Some(v1 != v2)
-        case _ => Some(true)
-      }
-    }
-
-    def > (other: Option[_]): Option[Boolean] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: String), Some(v2: String)) => Some(v1 > v2)
-          case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble)
-          case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble)
-          case (Some(v1: Int), Some(v2)) => Some(v1 > v2.toString.toDouble)
-          case (Some(v1: Long), Some(v2)) => Some(v1 > v2.toString.toDouble)
-          case (Some(v1: Float), Some(v2)) => Some(v1 > v2.toString.toDouble)
-          case (Some(v1: Double), Some(v2)) => Some(v1 > v2.toString.toDouble)
-          case _ => None
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def >= (other: Option[_]): Option[Boolean] = {
-      Try {
-        (value, other) match {
-          case (None, None) | (Some(null), Some(null)) => Some(true)
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2)
-          case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble)
-          case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble)
-          case (Some(v1: Int), Some(v2)) => Some(v1 >= v2.toString.toDouble)
-          case (Some(v1: Long), Some(v2)) => Some(v1 >= v2.toString.toDouble)
-          case (Some(v1: Float), Some(v2)) => Some(v1 >= v2.toString.toDouble)
-          case (Some(v1: Double), Some(v2)) => Some(v1 >= v2.toString.toDouble)
-          case _ => None
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def < (other: Option[_]): Option[Boolean] = {
-      Try {
-        (value, other) match {
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: String), Some(v2: String)) => Some(v1 < v2)
-          case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble)
-          case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble)
-          case (Some(v1: Int), Some(v2)) => Some(v1 < v2.toString.toDouble)
-          case (Some(v1: Long), Some(v2)) => Some(v1 < v2.toString.toDouble)
-          case (Some(v1: Float), Some(v2)) => Some(v1 < v2.toString.toDouble)
-          case (Some(v1: Double), Some(v2)) => Some(v1 < v2.toString.toDouble)
-          case _ => None
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-    def <= (other: Option[_]): Option[Boolean] = {
-      Try {
-        (value, other) match {
-          case (None, None) | (Some(null), Some(null)) => Some(true)
-          case (None, _) | (_, None) => None
-          case (Some(null), _) | (_, Some(null)) => None
-          case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2)
-          case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble)
-          case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble)
-          case (Some(v1: Int), Some(v2)) => Some(v1 <= v2.toString.toDouble)
-          case (Some(v1: Long), Some(v2)) => Some(v1 <= v2.toString.toDouble)
-          case (Some(v1: Float), Some(v2)) => Some(v1 <= v2.toString.toDouble)
-          case (Some(v1: Double), Some(v2)) => Some(v1 <= v2.toString.toDouble)
-          case _ => None
-        }
-      } match {
-        case Success(opt) => opt
-        case _ => None
-      }
-    }
-
-
-    def in (other: Iterable[Option[_]]): Option[Boolean] = {
-      other.foldLeft(Some(false): Option[Boolean]) { (res, next) =>
-        optOr(res, ===(next))
-      }
-    }
-
-    def not_in (other: Iterable[Option[_]]): Option[Boolean] = {
-      other.foldLeft(Some(true): Option[Boolean]) { (res, next) =>
-        optAnd(res, =!=(next))
-      }
-    }
-
-    def between (other: Iterable[Option[_]]): Option[Boolean] = {
-      if (other.size < 2) None else {
-        val (begin, end) = (other.head, other.tail.head)
-        if (begin.isEmpty && end.isEmpty) Some(value.isEmpty)
-        else optAnd(>=(begin), <=(end))
-      }
-    }
-
-    def not_between (other: Iterable[Option[_]]): Option[Boolean] = {
-      if (other.size < 2) None else {
-        val (begin, end) = (other.head, other.tail.head)
-        if (begin.isEmpty && end.isEmpty) Some(value.nonEmpty)
-        else optOr(<(begin), >(end))
-      }
-    }
-
-    def unary_! (): Option[Boolean] = {
-      optNot(value)
-    }
-
-    def && (other: Option[_]): Option[Boolean] = {
-      optAnd(value, other)
-    }
-
-    def || (other: Option[_]): Option[Boolean] = {
-      optOr(value, other)
-    }
-
-
-    private def optNot(a: Option[_]): Option[Boolean] = {
-      a match {
-        case None => None
-        case Some(null) => None
-        case Some(v: Boolean) => Some(!v)
-        case _ => None
-      }
-    }
-    private def optAnd(a: Option[_], b: Option[_]): Option[Boolean] = {
-      (a, b) match {
-        case (None, _) | (_, None) => None
-        case (Some(null), _) | (_, Some(null)) => None
-        case (Some(false), _) | (_, Some(false)) => Some(false)
-        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2)
-        case _ => None
-      }
-    }
-    private def optOr(a: Option[_], b: Option[_]): Option[Boolean] = {
-      (a, b) match {
-        case (None, _) | (_, None) => None
-        case (Some(null), _) | (_, Some(null)) => None
-        case (Some(true), _) | (_, Some(true)) => Some(true)
-        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2)
-        case _ => None
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala
deleted file mode 100644
index 9d027ec..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import org.apache.spark.sql.types._
-
-object DataTypeCalculationUtil {
-
-  implicit def dataType2CalculationType(tp: DataType): CalculationType = CalculationType(tp)
-
-  case class CalculationType(tp: DataType) extends Serializable {
-    def binaryOpr (other: DataType): DataType = {
-      (tp, other) match {
-        case (NullType, _) | (_, NullType) => NullType
-        case (t, _) => t
-      }
-    }
-    def unaryOpr (): DataType = {
-      tp
-    }
-  }
-
-  case class DataTypeException() extends Exception {}
-
-  def getDataType(value: Any): DataType = {
-    value match {
-      case v: String => StringType
-      case v: Boolean => BooleanType
-      case v: Long => LongType
-      case v: Int => IntegerType
-      case v: Short => ShortType
-      case v: Byte => ByteType
-      case v: Double => DoubleType
-      case v: Float => FloatType
-      case v: Map[_, _] => MapType(getSameDataType(v.keys), getSameDataType(v.values))
-      case v: Iterable[_] => ArrayType(getSameDataType(v))
-      case _ => NullType
-    }
-  }
-
-  private def getSameDataType(values: Iterable[Any]): DataType = {
-    values.foldLeft(NullType: DataType)((a, c) => genericTypeOf(a, getDataType(c)))
-  }
-
-  private def genericTypeOf(dt1: DataType, dt2: DataType): DataType = {
-    if (dt1 == dt2) dt1 else {
-      dt1 match {
-        case NullType => dt2
-        case StringType => dt1
-        case DoubleType => {
-          dt2 match {
-            case StringType => dt2
-            case DoubleType | FloatType | LongType | IntegerType | ShortType | ByteType => dt1
-            case _ => throw DataTypeException()
-          }
-        }
-        case FloatType => {
-          dt2 match {
-            case StringType | DoubleType => dt2
-            case FloatType | LongType | IntegerType | ShortType | ByteType => dt1
-            case _ => throw DataTypeException()
-          }
-        }
-        case LongType => {
-          dt2 match {
-            case StringType | DoubleType | FloatType => dt2
-            case LongType | IntegerType | ShortType | ByteType => dt1
-            case _ => throw DataTypeException()
-          }
-        }
-        case IntegerType => {
-          dt2 match {
-            case StringType | DoubleType | FloatType | LongType => dt2
-            case IntegerType | ShortType | ByteType => dt1
-            case _ => throw DataTypeException()
-          }
-        }
-        case ShortType => {
-          dt2 match {
-            case StringType | DoubleType | FloatType | LongType | IntegerType => dt2
-            case ShortType | ByteType => dt1
-            case _ => throw DataTypeException()
-          }
-        }
-        case ByteType => {
-          dt2 match {
-            case StringType | DoubleType | FloatType | LongType | IntegerType | ShortType => dt2
-            case ByteType => dt1
-            case _ => throw DataTypeException()
-          }
-        }
-        case BooleanType => {
-          dt2 match {
-            case StringType => dt2
-            case BooleanType => dt1
-            case _ => throw DataTypeException()
-          }
-        }
-        case MapType(kdt1, vdt1, _) => {
-          dt2 match {
-            case MapType(kdt2, vdt2, _) => MapType(genericTypeOf(kdt1, kdt2), genericTypeOf(vdt1, vdt2))
-            case _ => throw DataTypeException()
-          }
-        }
-        case ArrayType(vdt1, _) => {
-          dt2 match {
-            case ArrayType(vdt2, _) => ArrayType(genericTypeOf(vdt1, vdt2))
-            case _ => throw DataTypeException()
-          }
-        }
-        case _ => throw DataTypeException()
-      }
-    }
-  }
-
-  def sequenceDataTypeMap(aggr: Map[String, DataType], value: Map[String, Any]): Map[String, DataType] = {
-    val dataTypes = value.foldLeft(Map[String, DataType]()) { (map, pair) =>
-      val (k, v) = pair
-      try {
-        map + (k -> getDataType(v))
-      } catch {
-        case e: DataTypeException => map
-      }
-    }
-    combineDataTypeMap(aggr, dataTypes)
-  }
-
-  def combineDataTypeMap(aggr1: Map[String, DataType], aggr2: Map[String, DataType]): Map[String, DataType] = {
-    aggr2.foldLeft(aggr1) { (a, c) =>
-      a.get(c._1) match {
-        case Some(t) => {
-          try {
-            a + (c._1 -> genericTypeOf(t, c._2))
-          } catch {
-            case e: DataTypeException => a
-          }
-        }
-        case _ => a + c
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala
deleted file mode 100644
index 940d0cb..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import org.apache.griffin.measure.rule.expr._
-import org.apache.griffin.measure.rule.func._
-import org.apache.spark.sql.Row
-
-import scala.util.{Success, Try}
-
-object ExprValueUtil {
-
-  private def append(path: List[String], step: String): List[String] = {
-    path :+ step
-  }
-
-  private def value2Map(key: String, value: Option[Any]): Map[String, Any] = {
-    value.flatMap(v => Some((key -> v))).toMap
-  }
-
-  private def getSingleValue(data: Option[Any], desc: FieldDescOnly): Option[Any] = {
-    data match {
-      case Some(row: Row) => {
-        desc match {
-          case i: IndexDesc => try { Some(row.getAs[Any](i.index)) } catch { case _ => None }
-          case f: FieldDesc => try { Some(row.getAs[Any](f.field)) } catch { case _ => None }
-          case _ => None
-        }
-      }
-      case Some(d: Map[String, Any]) => {
-        desc match {
-          case f: FieldDesc => d.get(f.field)
-          case _ => None
-        }
-      }
-      case Some(d: Seq[Any]) => {
-        desc match {
-          case i: IndexDesc => if (i.index >= 0 && i.index < d.size) Some(d(i.index)) else None
-          case _ => None
-        }
-      }
-    }
-  }
-
-  private def calcExprValues(pathDatas: List[(List[String], Option[Any])], expr: Expr, existExprValueMap: Map[String, Any]): List[(List[String], Option[Any])] = {
-    Try {
-      expr match {
-        case selection: SelectionExpr => {
-          selection.selectors.foldLeft(pathDatas) { (pds, selector) =>
-            calcExprValues(pds, selector, existExprValueMap)
-          }
-        }
-        case selector: IndexFieldRangeSelectExpr => {
-          pathDatas.flatMap { pathData =>
-            val (path, data) = pathData
-            data match {
-              case Some(row: Row) => {
-                selector.fields.flatMap { field =>
-                  field match {
-                    case (_: IndexDesc) | (_: FieldDesc) => {
-                      getSingleValue(data, field).map { v => (append(path, field.desc), Some(v)) }
-                    }
-                    case a: AllFieldsDesc => {
-                      (0 until row.size).flatMap { i =>
-                        getSingleValue(data, IndexDesc(i.toString)).map { v =>
-                          (append(path, s"${a.desc}_${i}"), Some(v))
-                        }
-                      }.toList
-                    }
-                    case r: FieldRangeDesc => {
-                      (r.startField, r.endField) match {
-                        case (si: IndexDesc, ei: IndexDesc) => {
-                          (si.index to ei.index).flatMap { i =>
-                            (append(path, s"${r.desc}_${i}"), getSingleValue(data, IndexDesc(i.toString)))
-                            getSingleValue(data, IndexDesc(i.toString)).map { v =>
-                              (append(path, s"${r.desc}_${i}"), Some(v))
-                            }
-                          }.toList
-                        }
-                        case _ => Nil
-                      }
-                    }
-                    case _ => Nil
-                  }
-                }
-              }
-              case Some(d: Map[String, Any]) => {
-                selector.fields.flatMap { field =>
-                  field match {
-                    case (_: IndexDesc) | (_: FieldDesc) => {
-                      getSingleValue(data, field).map { v => (append(path, field.desc), Some(v)) }
-                    }
-                    case a: AllFieldsDesc => {
-                      d.keySet.flatMap { k =>
-                        getSingleValue(data, FieldDesc(k)).map { v =>
-                          (append(path, s"${a.desc}_${k}"), Some(v))
-                        }
-                      }
-                    }
-                    case _ => None
-                  }
-                }
-              }
-              case Some(d: Seq[Any]) => {
-                selector.fields.flatMap { field =>
-                  field match {
-                    case (_: IndexDesc) | (_: FieldDesc) => {
-                      getSingleValue(data, field).map { v => (append(path, field.desc), Some(v)) }
-                    }
-                    case a: AllFieldsDesc => {
-                      (0 until d.size).flatMap { i =>
-                        (append(path, s"${a.desc}_${i}"), getSingleValue(data, IndexDesc(i.toString)))
-                        getSingleValue(data, IndexDesc(i.toString)).map { v =>
-                          (append(path, s"${a.desc}_${i}"), Some(v))
-                        }
-                      }.toList
-                    }
-                    case r: FieldRangeDesc => {
-                      (r.startField, r.endField) match {
-                        case (si: IndexDesc, ei: IndexDesc) => {
-                          (si.index to ei.index).flatMap { i =>
-                            (append(path, s"${r.desc}_${i}"), getSingleValue(data, IndexDesc(i.toString)))
-                            getSingleValue(data, IndexDesc(i.toString)).map { v =>
-                              (append(path, s"${r.desc}_${i}"), Some(v))
-                            }
-                          }.toList
-                        }
-                        case _ => None
-                      }
-                    }
-                    case _ => None
-                  }
-                }
-              }
-            }
-          }
-        }
-        case selector: FunctionOperationExpr => {
-          val args: Array[Option[Any]] = selector.args.map { arg =>
-            arg.calculate(existExprValueMap)
-          }.toArray
-          pathDatas.flatMap { pathData =>
-            val (path, data) = pathData
-            data match {
-              case Some(d: String) => {
-                val res = FunctionUtil.invoke(selector.func, Some(d) +: args)
-                val residx = res.zipWithIndex
-                residx.map { vi =>
-                  val (v, i) = vi
-                  val step = if (i == 0) s"${selector.desc}" else s"${selector.desc}_${i}"
-                  (append(path, step), v)
-                }
-              }
-              case _ => None
-            }
-          }
-        }
-        case selector: FilterSelectExpr => {  // fileter means select the items fit the condition
-          pathDatas.flatMap { pathData =>
-            val (path, data) = pathData
-            data match {
-              case Some(row: Row) => {
-                // right value could not be selection
-                val rmap = value2Map(selector.value._id, selector.value.calculate(existExprValueMap))
-                (0 until row.size).flatMap { i =>
-                  val dt = getSingleValue(data, IndexDesc(i.toString))
-                  val lmap = value2Map(selector.fieldKey, getSingleValue(dt, selector.field))
-                  val partValueMap = lmap ++ rmap
-                  selector.calculate(partValueMap) match {
-                    case Some(true) => Some((append(path, s"${selector.desc}_${i}"), dt))
-                    case _ => None
-                  }
-                }
-              }
-              case Some(d: Map[String, Any]) => {
-                val rmap = value2Map(selector.value._id, selector.value.calculate(existExprValueMap))
-                d.keySet.flatMap { k =>
-                  val dt = getSingleValue(data, FieldDesc(k))
-                  val lmap = value2Map(selector.fieldKey, getSingleValue(dt, selector.field))
-                  val partValueMap = lmap ++ rmap
-                  selector.calculate(partValueMap) match {
-                    case Some(true) => Some((append(path, s"${selector.desc}_${k}"), dt))
-                    case _ => None
-                  }
-                }
-              }
-              case Some(d: Seq[Any]) => {
-                val rmap = value2Map(selector.value._id, selector.value.calculate(existExprValueMap))
-                (0 until d.size).flatMap { i =>
-                  val dt = getSingleValue(data, IndexDesc(i.toString))
-                  val lmap = value2Map(selector.fieldKey, getSingleValue(dt, selector.field))
-                  val partValueMap = lmap ++ rmap
-                  selector.calculate(partValueMap) match {
-                    case Some(true) => Some((append(path, s"${selector.desc}_${i}"), dt))
-                    case _ => None
-                  }
-                }
-              }
-            }
-          }
-        }
-        case _ => {
-          (expr.desc :: Nil, expr.calculate(existExprValueMap)) :: Nil
-        }
-      }
-    } match {
-      case Success(v) => v
-      case _ => Nil
-    }
-  }
-
-  private def calcExprsValues(data: Option[Any], exprs: Iterable[Expr], existExprValueMap: Map[String, Any]): List[Map[String, Any]] = {
-    val selectionValues: Map[String, List[(List[String], Any)]] = exprs.map { expr =>
-      (expr._id, calcExprValues((Nil, data) :: Nil, expr, existExprValueMap).flatMap { pair =>
-        pair._2 match {
-          case Some(v) => Some((pair._1, v))
-          case _ => None
-        }
-      })
-    }.toMap
-    // if exprs is empty, return an empty value map for each row
-    if (selectionValues.isEmpty) List(Map[String, Any]())
-    else SchemaValueCombineUtil.cartesian(selectionValues)
-  }
-
-  // try to calculate some exprs from data and initExprValueMap, generate a new expression value map
-  // depends on origin data and existed expr value map
-  def genExprValueMaps(data: Option[Any], exprs: Iterable[Expr], initExprValueMap: Map[String, Any]): List[Map[String, Any]] = {
-    val (selections, nonSelections) = exprs.partition(_.isInstanceOf[SelectionExpr])
-    val valueMaps = calcExprsValues(data, selections, initExprValueMap)
-    updateExprValueMaps(nonSelections, valueMaps)
-  }
-
-  // with exprValueMap, calculate expressions, update the expression value map
-  // only depends on existed expr value map, only calculation, not need origin data
-  def updateExprValueMaps(exprs: Iterable[Expr], exprValueMaps: List[Map[String, Any]]): List[Map[String, Any]] = {
-    exprValueMaps.map { valueMap =>
-      exprs.foldLeft(valueMap) { (em, expr) =>
-        expr.calculate(em) match {
-          case Some(v) => em + (expr._id -> v)
-          case _ => em
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala
deleted file mode 100644
index 5ec143f..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import org.apache.griffin.measure.rule.expr._
-
-case class RuleAnalyzer(rule: StatementExpr) extends Serializable {
-
-  val constData = ""
-  private val SourceData = "source"
-  private val TargetData = "target"
-
-  val constCacheExprs: Iterable[Expr] = rule.getCacheExprs(constData)
-  private val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData)
-  private val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData)
-
-  private val sourcePersistExprs: Iterable[Expr] = rule.getPersistExprs(SourceData)
-  private val targetPersistExprs: Iterable[Expr] = rule.getPersistExprs(TargetData)
-
-  val constFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(constData).toSet
-  private val sourceFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(SourceData).toSet ++ sourcePersistExprs.toSet
-  private val targetFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(TargetData).toSet ++ targetPersistExprs.toSet
-
-  private val groupbyExprPairs: Seq[(Expr, Expr)] = rule.getGroupbyExprPairs((SourceData, TargetData))
-  private val sourceGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._1)
-  private val targetGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._2)
-
-  val sourceRuleExprs: RuleExprs = RuleExprs(sourceGroupbyExprs, sourceCacheExprs,
-    sourceFinalCacheExprs, sourcePersistExprs)
-  val targetRuleExprs: RuleExprs = RuleExprs(targetGroupbyExprs, targetCacheExprs,
-    targetFinalCacheExprs, targetPersistExprs)
-
-}
-
-
-// for a single data source
-// groupbyExprs: in accuracy case, these exprs could be groupby exprs
-//                  Data keys for accuracy case, generated by the equal statements, to improve the calculation efficiency
-// cacheExprs: the exprs value could be caculated independently, and cached for later use
-//                  Cached for the finalCacheExprs calculation, it has some redundant values, saving it wastes a lot
-// finalCacheExprs: the root of cachedExprs, cached for later use, plus with persistExprs
-//                  Cached for the calculation usage, and can be saved for the re-calculation in streaming mode
-// persistExprs: the expr values should be persisted, only the direct selection exprs are persistable
-//                  Persisted for record usage, to record the missing data, need be readable as raw data
-case class RuleExprs(groupbyExprs: Seq[Expr],
-                     cacheExprs: Iterable[Expr],
-                     finalCacheExprs: Iterable[Expr],
-                     persistExprs: Iterable[Expr]
-                    ) extends Serializable {
-  // for example: for a rule "$source.name = $target.name AND $source.age < $target.age + (3 * 4)"
-  // in this rule, for the target data source, the targetRuleExprs looks like below
-  // groupbyExprs: $target.name
-  // cacheExprs: $target.name, $target.age, $target.age + (3 * 4)
-  // finalCacheExprs: $target.name, $target.age + (3 * 4), $target.age
-  // persistExprs: $target.name, $target.age
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala
deleted file mode 100644
index bbaf5cb..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import org.apache.griffin.measure.config.params.user._
-
-import scala.util.Failure
-//import org.apache.griffin.measure.rule.expr_old._
-import org.apache.griffin.measure.rule.expr._
-
-import scala.util.{Success, Try}
-
-
-case class RuleFactory(evaluateRuleParam: EvaluateRuleParam) {
-
-  val ruleParser: RuleParser = RuleParser()
-
-  def generateRule(): StatementExpr = {
-    val rules = evaluateRuleParam.rules
-    val statement = parseExpr(rules) match {
-      case Success(se) => se
-      case Failure(ex) => throw ex
-    }
-    statement
-  }
-
-  private def parseExpr(rules: String): Try[StatementExpr] = {
-    Try {
-      val result = ruleParser.parseAll(ruleParser.rule, rules)
-      if (result.successful) result.get
-      else throw new Exception("parse rule error!")
-//      throw new Exception("parse rule error!")
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala
deleted file mode 100644
index 55d9f45..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import org.apache.griffin.measure.rule.expr._
-
-import scala.util.parsing.combinator._
-
-case class RuleParser() extends JavaTokenParsers with Serializable {
-
-  /**
-    * BNF representation for grammar as below:
-    *
-    * <rule> ::= <logical-statement> [WHEN <logical-statement>]
-    * rule: mapping-rule [WHEN when-rule]
-    * - mapping-rule: the first level opr should better not be OR | NOT, otherwise it can't automatically find the groupby column
-    * - when-rule: only contain the general info of data source, not the special info of each data row
-    *
-    * <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")"
-    * logical-statement: return boolean value
-    * logical-operator: "AND" | "&&", "OR" | "||", "NOT" | "!"
-    *
-    * <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>)
-    * logical-expression example: $source.id = $target.id, $source.page_id IN ('3214', '4312', '60821')
-    *
-    * <compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
-    * <range-opr> ::= ["NOT"] "IN" | "BETWEEN"
-    * <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
-    * range-expr example: ('3214', '4312', '60821'), (10, 15), ()
-    *
-    * <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
-    * math-expr example: $source.price * $target.count, "hello" + " " + "world" + 123
-    *
-    * <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
-    * <unary-opr> ::= "+" | "-"
-    *
-    * <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
-    *
-    * <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+
-    * selection example: $source.price, $source.json(), $source['state'], $source.numList[3], $target.json().mails['org' = 'apache'].names[*]
-    *
-    * <selection-head> ::= $source | $target
-    *
-    * <field-sel> ::= "." <field-string>
-    *
-    * <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
-    * <function-name> ::= <name-string>
-    * <arg> ::= <math-expr>
-    *
-    * <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]"
-    * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*"
-    * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * means all items, 'age' means item 'age'
-    * <index-field> ::= <index> | <field-quote> | <all-selection>
-    * index: 0 ~ n means position from start, -1 ~ -n means position from end
-    * <field-quote> ::= ' <field-string> ' | " <field-string> "
-    *
-    * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
-    * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
-    * filter-sel example: ['name' = 'URL'], $source.man['age' > $source.graduate_age + 5 ]
-    *
-    * When <math-expr> in the selection, it mustn't contain the different <selection-head>, for example:
-    * $source.tags[1+2]             valid
-    * $source.tags[$source.first]   valid
-    * $source.tags[$target.first]   invalid
-    * -- Such job is for validation, not for parser
-    *
-    *
-    * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> | <literal-null> | <literal-none>
-    * <literal-string> ::= <any-string>
-    * <literal-number> ::= <integer> | <double>
-    * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
-    * <literal-boolean> ::= true | false
-    * <literal-null> ::= null | undefined
-    * <literal-none> ::= none
-    *
-    */
-
-  object Keyword {
-    def WhenKeywords: Parser[String] = """(?i)when""".r
-    def UnaryLogicalKeywords: Parser[String] = """(?i)not""".r
-    def BinaryLogicalKeywords: Parser[String] = """(?i)and|or""".r
-    def RangeKeywords: Parser[String] = """(?i)(not\s+)?(in|between)""".r
-    def DataSourceKeywords: Parser[String] = """(?i)\$(source|target)""".r
-    def Keywords: Parser[String] = WhenKeywords | UnaryLogicalKeywords | BinaryLogicalKeywords | RangeKeywords | DataSourceKeywords
-  }
-  import Keyword._
-
-  object Operator {
-    def NotLogicalOpr: Parser[String] = """(?i)not""".r | "!"
-    def AndLogicalOpr: Parser[String] = """(?i)and""".r | "&&"
-    def OrLogicalOpr: Parser[String] = """(?i)or""".r | "||"
-    def CompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r
-    def RangeOpr: Parser[String] = RangeKeywords
-
-    def UnaryMathOpr: Parser[String] = "+" | "-"
-    def BinaryMathOpr1: Parser[String] = "*" | "/" | "%"
-    def BinaryMathOpr2: Parser[String] = "+" | "-"
-
-    def FilterCompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r
-
-    def SqBracketPair: (Parser[String], Parser[String]) = ("[", "]")
-    def BracketPair: (Parser[String], Parser[String]) = ("(", ")")
-    def Dot: Parser[String] = "."
-    def AllSelection: Parser[String] = "*"
-    def SQuote: Parser[String] = "'"
-    def DQuote: Parser[String] = "\""
-    def Comma: Parser[String] = ","
-  }
-  import Operator._
-
-  object SomeString {
-//    def AnyString: Parser[String] = """[^'\"{}\[\]()=<>.$@,;+\-*/\\]*""".r
-    def AnyString: Parser[String] = """[^'\"]*""".r
-    def SimpleFieldString: Parser[String] = """\w+""".r
-    def FieldString: Parser[String] = """[\w\s]+""".r
-    def NameString: Parser[String] = """[a-zA-Z_]\w*""".r
-  }
-  import SomeString._
-
-  object SomeNumber {
-    def IntegerNumber: Parser[String] = """[+\-]?\d+""".r
-    def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r
-    def IndexNumber: Parser[String] = IntegerNumber
-  }
-  import SomeNumber._
-
-  // -- literal --
-  def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean | literialNull | literialNone
-  def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) }
-  def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) }
-  def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { LiteralTimeExpr(_) }
-  def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | """(?i)false""".r) ^^ { LiteralBooleanExpr(_) }
-  def literialNull: Parser[LiteralNullExpr] = ("""(?i)null""".r | """(?i)undefined""".r) ^^ { LiteralNullExpr(_) }
-  def literialNone: Parser[LiteralNoneExpr] = """(?i)none""".r ^^ { LiteralNoneExpr(_) }
-
-  // -- selection --
-  // <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+
-  def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ^^ {
-    case head ~ selectors => SelectionExpr(head, selectors)
-  }
-  def selector: Parser[SelectExpr] = (functionOperation | fieldSelect | indexFieldRangeSelect | filterSelect)
-
-  def selectionHead: Parser[SelectionHead] = DataSourceKeywords ^^ { SelectionHead(_) }
-  // <field-sel> ::= "." <field-string>
-  def fieldSelect: Parser[IndexFieldRangeSelectExpr] = Dot ~> SimpleFieldString ^^ {
-    case field => IndexFieldRangeSelectExpr(FieldDesc(field) :: Nil)
-  }
-  // <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
-  def functionOperation: Parser[FunctionOperationExpr] = Dot ~ NameString ~ BracketPair._1 ~ repsep(argument, Comma) ~ BracketPair._2 ^^ {
-    case _ ~ func ~ _ ~ args ~ _ => FunctionOperationExpr(func, args)
-  }
-  def argument: Parser[MathExpr] = mathExpr
-  // <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]"
-  def indexFieldRangeSelect: Parser[IndexFieldRangeSelectExpr] = SqBracketPair._1 ~> rep1sep(indexFieldRange, Comma) <~ SqBracketPair._2 ^^ {
-    case ifrs => IndexFieldRangeSelectExpr(ifrs)
-  }
-  // <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*"
-  def indexFieldRange: Parser[FieldDescOnly] = indexField | BracketPair._1 ~ indexField ~ Comma ~ indexField ~ BracketPair._2 ^^ {
-    case _ ~ if1 ~ _ ~ if2 ~ _ => FieldRangeDesc(if1, if2)
-  }
-  // <index-field> ::= <index> | <field-quote> | <all-selection>
-  // *here it can parse <math-expr>, but for simple situation, not supported now*
-  def indexField: Parser[FieldDescOnly] = IndexNumber ^^ { IndexDesc(_) } | fieldQuote | AllSelection ^^ { AllFieldsDesc(_) }
-  // <field-quote> ::= ' <field-string> ' | " <field-string> "
-  def fieldQuote: Parser[FieldDesc] = (SQuote ~> FieldString <~ SQuote | DQuote ~> FieldString <~ DQuote) ^^ { FieldDesc(_) }
-  // <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
-  def filterSelect: Parser[FilterSelectExpr] = SqBracketPair._1 ~> fieldQuote ~ FilterCompareOpr ~ mathExpr <~ SqBracketPair._2 ^^ {
-    case field ~ compare ~ value => FilterSelectExpr(field, compare, value)
-  }
-
-  // -- math --
-  // <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
-  def mathFactor: Parser[MathExpr] = (literal | selection | BracketPair._1 ~> mathExpr <~ BracketPair._2) ^^ { MathFactorExpr(_) }
-  // <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
-  // <unary-opr> ::= "+" | "-"
-  def unaryMathExpr: Parser[MathExpr] = rep(UnaryMathOpr) ~ mathFactor ^^ {
-    case Nil ~ a => a
-    case list ~ a => UnaryMathExpr(list, a)
-  }
-  // <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
-  def binaryMathExpr1: Parser[MathExpr] = unaryMathExpr ~ rep(BinaryMathOpr1 ~ unaryMathExpr) ^^ {
-    case a ~ Nil => a
-    case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
-  }
-  def binaryMathExpr2: Parser[MathExpr] = binaryMathExpr1 ~ rep(BinaryMathOpr2 ~ binaryMathExpr1) ^^ {
-    case a ~ Nil => a
-    case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
-  }
-  def mathExpr: Parser[MathExpr] = binaryMathExpr2
-
-  // -- logical expression --
-  // <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
-  def rangeExpr: Parser[RangeDesc] = BracketPair._1 ~> repsep(mathExpr, Comma) <~ BracketPair._2 ^^ { RangeDesc(_) }
-  // <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>)
-  def logicalExpr: Parser[LogicalExpr] = mathExpr ~ CompareOpr ~ mathExpr ^^ {
-    case left ~ opr ~ right => LogicalCompareExpr(left, opr, right)
-  } | mathExpr ~ RangeOpr ~ rangeExpr ^^ {
-    case left ~ opr ~ range => LogicalRangeExpr(left, opr, range)
-  } | mathExpr ^^ { LogicalSimpleExpr(_) }
-
-  // -- logical statement --
-  def logicalFactor: Parser[LogicalExpr] = logicalExpr | BracketPair._1 ~> logicalStatement <~ BracketPair._2
-  def notLogicalStatement: Parser[LogicalExpr] = rep(NotLogicalOpr) ~ logicalFactor ^^ {
-    case Nil ~ a => a
-    case list ~ a => UnaryLogicalExpr(list, a)
-  }
-  def andLogicalStatement: Parser[LogicalExpr] = notLogicalStatement ~ rep(AndLogicalOpr ~ notLogicalStatement) ^^ {
-    case a ~ Nil => a
-    case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
-  }
-  def orLogicalStatement: Parser[LogicalExpr] = andLogicalStatement ~ rep(OrLogicalOpr ~ andLogicalStatement) ^^ {
-    case a ~ Nil => a
-    case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
-  }
-  // <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")"
-  def logicalStatement: Parser[LogicalExpr] = orLogicalStatement
-
-  // -- clause statement --
-  def whereClause: Parser[WhereClauseExpr] = logicalStatement ^^ { WhereClauseExpr(_) }
-  def whenClause: Parser[WhenClauseExpr] = WhenKeywords ~> logicalStatement ^^ { WhenClauseExpr(_) }
-
-  // -- rule --
-  // <rule> ::= <logical-statement> [WHEN <logical-statement>]
-  def rule: Parser[StatementExpr] = whereClause ~ opt(whenClause) ^^ {
-    case a ~ b => StatementExpr(a, b)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala
deleted file mode 100644
index ed3b3fc..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-object SchemaValueCombineUtil {
-
-  // Map[String, List[(List[String], T)]]: Map[key, List[(path, value)]]
-  def cartesian[T](valuesMap: Map[String, List[(List[String], T)]]): List[Map[String, T]] = {
-    val fieldsList: List[(String, List[(List[String], T)])] = valuesMap.toList
-
-    // List[key, List[(path, value)]] to List[(path, (key, value))]
-    val valueList: List[(List[String], (String, T))] = fieldsList.flatMap { fields =>
-      val (key, list) = fields
-      list.map { pv =>
-        val (path, value) = pv
-        (path, (key, value))
-      }
-    }
-
-    // 1. generate tree from value list, and return root node
-    val root = TreeUtil.genRootTree(valueList)
-
-    // 2. deep first visit tree from root, merge datas into value map list
-    val valueMapList: List[Map[String, _]] = TreeUtil.mergeDatasIntoMap(root, Nil)
-
-    // 3. simple change
-    val result = valueMapList.map { mp =>
-      mp.map { kv =>
-        val (k, v) = kv
-        (k, v.asInstanceOf[T])
-      }
-    }
-
-    result
-
-  }
-
-
-  case class TreeNode(key: String, var datas: List[(String, _)]) {
-    var children = List[TreeNode]()
-    def addChild(node: TreeNode): Unit = children = children :+ node
-    def mergeSelf(node: TreeNode): Unit = datas = datas ::: node.datas
-  }
-
-  object TreeUtil {
-    private def genTree(path: List[String], datas: List[(String, _)]): Option[TreeNode] = {
-      path match {
-        case Nil => None
-        case head :: tail => {
-          genTree(tail, datas) match {
-            case Some(child) => {
-              val curNode = TreeNode(head, Nil)
-              curNode.addChild(child)
-              Some(curNode)
-            }
-            case _ => Some(TreeNode(head, datas))
-          }
-        }
-      }
-    }
-
-    private def mergeTrees(trees: List[TreeNode], newTreeOpt: Option[TreeNode]): List[TreeNode] = {
-      newTreeOpt match {
-        case Some(newTree) => {
-          trees.find(tree => tree.key == newTree.key) match {
-            case Some(tree) => {
-              // children merge
-              for (child <- newTree.children) {
-                tree.children = mergeTrees(tree.children, Some(child))
-              }
-              // self data merge
-              tree.mergeSelf(newTree)
-              trees
-            }
-            case _ => trees :+ newTree
-          }
-        }
-        case _ => trees
-      }
-    }
-
-    private def root(): TreeNode = TreeNode("", Nil)
-
-    def genRootTree(values: List[(List[String], (String, _))]): TreeNode = {
-      val rootNode = root()
-      val nodeOpts = values.map(value => genTree(value._1, value._2 :: Nil))
-      rootNode.children = nodeOpts.foldLeft(List[TreeNode]()) { (trees, treeOpt) =>
-        mergeTrees(trees, treeOpt)
-      }
-      rootNode
-    }
-
-    private def add(mapList1: List[Map[String, _]], mapList2: List[Map[String, _]]):  List[Map[String, _]] = {
-      mapList1 ::: mapList2
-    }
-    private def multiply(mapList1: List[Map[String, _]], mapList2: List[Map[String, _]]):  List[Map[String, _]] = {
-      mapList1.flatMap { map1 =>
-        mapList2.map { map2 =>
-          map1 ++ map2
-        }
-      }
-    }
-
-    private def keysList(mapList: List[Map[String, _]]): List[String] = {
-      val keySet = mapList match {
-        case Nil => Set[String]()
-        case head :: _ => head.keySet
-      }
-      keySet.toList
-    }
-
-    def mergeDatasIntoMap(root: TreeNode, mapDatas: List[Map[String, _]]): List[Map[String, _]] = {
-      val childrenKeysMapDatas = root.children.foldLeft(Map[List[String], List[Map[String, _]]]()) { (keysMap, child) =>
-        val childMdts = mergeDatasIntoMap(child, List[Map[String, _]]())
-        childMdts match {
-          case Nil => keysMap
-          case _ => {
-            val keys = keysList(childMdts)
-            val afterList = keysMap.get(keys) match {
-              case Some(list) => add(list, childMdts)
-              case _ => childMdts
-            }
-            keysMap + (keys -> afterList)
-          }
-        }
-      }
-      val childrenMergeMaps = childrenKeysMapDatas.values.foldLeft(List[Map[String, _]]()) { (originList, list) =>
-        originList match {
-          case Nil => list
-          case _ => multiply(originList, list)
-        }
-      }
-      val result = mergeNodeChildrenDatasIntoMap(root, childrenMergeMaps)
-      result
-    }
-
-    private def mergeNodeChildrenDatasIntoMap(node: TreeNode, mapDatas: List[Map[String, _]]): List[Map[String, _]] = {
-      val datas: List[(String, (String, Any))] = node.children.flatMap { child =>
-        child.datas.map(dt => (dt._1, (child.key, dt._2)))
-      }
-      val childrenDataKeys: Set[String] = datas.map(_._1).toSet
-      val childrenDataLists: Map[String, List[(String, _)]] = datas.foldLeft(childrenDataKeys.map(k => (k, List[(String, _)]())).toMap) { (maps, data) =>
-        maps.get(data._1) match {
-          case Some(list) => maps + (data._1 -> (list :+ data._2))
-          case _ => maps
-        }
-      }
-
-      // multiply different key datas
-      childrenDataLists.foldLeft(mapDatas) { (mdts, klPair) =>
-        val (key, list) = klPair
-        mdts match {
-          case Nil => list.map(pr => Map[String, Any]((key -> pr._2)))
-          case _ => {
-            list.flatMap { kvPair =>
-              val (path, value) = kvPair
-              mdts.map { mp =>
-                mp + (key -> value)
-              }
-            }
-          }
-        }
-      }
-
-    }
-  }
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/AdaptPhase.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/AdaptPhase.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/AdaptPhase.scala
new file mode 100644
index 0000000..26db78d
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/AdaptPhase.scala
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+sealed trait AdaptPhase {}
+
+final case object PreProcPhase extends AdaptPhase {}
+
+final case object RunPhase extends AdaptPhase {}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
new file mode 100644
index 0000000..eb57838
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
@@ -0,0 +1,44 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.rule.step._
+
+case class DataFrameOprAdaptor(adaptPhase: AdaptPhase) extends RuleAdaptor {
+
+  def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = {
+    DfOprStep(getName(param), getRule(param), getDetails(param),
+      getPersistType(param), getUpdateDataSource(param)) :: Nil
+  }
+  def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = {
+    ruleStep match {
+      case rs @ DfOprStep(_, _, _, _, _) => rs :: Nil
+      case _ => Nil
+    }
+  }
+
+  def getTempSourceNames(param: Map[String, Any]): Seq[String] = {
+    param.get(_name) match {
+      case Some(name) => name.toString :: Nil
+      case _ => Nil
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
new file mode 100644
index 0000000..2a189d4
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
@@ -0,0 +1,349 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+import org.apache.griffin.measure.data.connector.GroupByColumn
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.dsl.analyzer._
+import org.apache.griffin.measure.rule.dsl.expr._
+import org.apache.griffin.measure.rule.dsl.parser.GriffinDslParser
+import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.utils.ParamUtil._
+
+case class GriffinDslAdaptor(dataSourceNames: Seq[String],
+                             functionNames: Seq[String],
+                             adaptPhase: AdaptPhase
+                            ) extends RuleAdaptor {
+
+  object StepInfo {
+    val _Name = "name"
+    val _PersistType = "persist.type"
+    val _UpdateDataSource = "update.data.source"
+    def getNameOpt(param: Map[String, Any]): Option[String] = param.get(_Name).map(_.toString)
+    def getPersistType(param: Map[String, Any]): PersistType = PersistType(param.getString(_PersistType, ""))
+    def getUpdateDataSourceOpt(param: Map[String, Any]): Option[String] = param.get(_UpdateDataSource).map(_.toString)
+  }
+  object AccuracyInfo {
+    val _Source = "source"
+    val _Target = "target"
+    val _MissRecords = "miss.records"
+    val _Accuracy = "accuracy"
+    val _Miss = "miss"
+    val _Total = "total"
+    val _Matched = "matched"
+  }
+  object ProfilingInfo {
+    val _Source = "source"
+    val _Profiling = "profiling"
+  }
+
+  def getNameOpt(param: Map[String, Any], key: String): Option[String] = param.get(key).map(_.toString)
+  def resultName(param: Map[String, Any], key: String): String = {
+    val nameOpt = param.get(key) match {
+      case Some(prm: Map[String, Any]) => StepInfo.getNameOpt(prm)
+      case _ => None
+    }
+    nameOpt.getOrElse(key)
+  }
+  def resultPersistType(param: Map[String, Any], key: String, defPersistType: PersistType): PersistType = {
+    param.get(key) match {
+      case Some(prm: Map[String, Any]) => StepInfo.getPersistType(prm)
+      case _ => defPersistType
+    }
+  }
+  def resultUpdateDataSourceOpt(param: Map[String, Any], key: String): Option[String] = {
+    param.get(key) match {
+      case Some(prm: Map[String, Any]) => StepInfo.getUpdateDataSourceOpt(prm)
+      case _ => None
+    }
+  }
+
+  val _dqType = "dq.type"
+
+  protected def getDqType(param: Map[String, Any]) = DqType(param.getString(_dqType, ""))
+
+  val filteredFunctionNames = functionNames.filter { fn =>
+    fn.matches("""^[a-zA-Z_]\w*$""")
+  }
+  val parser = GriffinDslParser(dataSourceNames, filteredFunctionNames)
+
+  def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = {
+    GriffinDslStep(getName(param), getRule(param), getDqType(param), getDetails(param)) :: Nil
+  }
+
+  def getTempSourceNames(param: Map[String, Any]): Seq[String] = {
+    val dqType = getDqType(param)
+    param.get(_name) match {
+      case Some(name) => {
+        dqType match {
+          case AccuracyType => {
+            Seq[String](
+              resultName(param, AccuracyInfo._MissRecords),
+              resultName(param, AccuracyInfo._Accuracy)
+            )
+          }
+          case ProfilingType => {
+            Seq[String](
+              resultName(param, ProfilingInfo._Profiling)
+            )
+          }
+          case TimelinessType => {
+            Nil
+          }
+          case _ => Nil
+        }
+      }
+      case _ => Nil
+    }
+  }
+
+  def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = {
+    ruleStep match {
+      case rs @ GriffinDslStep(_, rule, dqType, _) => {
+        val exprOpt = try {
+          val result = parser.parseRule(rule, dqType)
+          if (result.successful) Some(result.get)
+          else {
+            println(result)
+            warn(s"adapt concrete rule step warn: parse rule [ ${rule} ] fails")
+            None
+          }
+        } catch {
+          case e: Throwable => {
+            error(s"adapt concrete rule step error: ${e.getMessage}")
+            None
+          }
+        }
+
+        exprOpt match {
+          case Some(expr) => {
+            try {
+              transConcreteRuleSteps(rs, expr)
+            } catch {
+              case e: Throwable => {
+                error(s"trans concrete rule step error: ${e.getMessage}")
+                Nil
+              }
+            }
+          }
+          case _ => Nil
+        }
+      }
+      case _ => Nil
+    }
+  }
+
+  private def transConcreteRuleSteps(ruleStep: GriffinDslStep, expr: Expr
+                                    ): Seq[ConcreteRuleStep] = {
+    val details = ruleStep.details
+    ruleStep.dqType match {
+      case AccuracyType => {
+        val sourceName = getNameOpt(details, AccuracyInfo._Source) match {
+          case Some(name) => name
+          case _ => dataSourceNames.head
+        }
+        val targetName = getNameOpt(details, AccuracyInfo._Target) match {
+          case Some(name) => name
+          case _ => dataSourceNames.tail.head
+        }
+        val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
+
+
+        if (!checkDataSourceExists(sourceName)) {
+          Nil
+        } else {
+          // 1. miss record
+          val missRecordsSql = if (!checkDataSourceExists(targetName)) {
+            val selClause = s"`${sourceName}`.*"
+            s"SELECT ${selClause} FROM `${sourceName}`"
+          } else {
+            val selClause = s"`${sourceName}`.*"
+            val onClause = expr.coalesceDesc
+            val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+              s"${sel.desc} IS NULL"
+            }.mkString(" AND ")
+            val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+              s"${sel.desc} IS NULL"
+            }.mkString(" AND ")
+            val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+            s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+          }
+          val missRecordsName = resultName(details, AccuracyInfo._MissRecords)
+          val missRecordsStep = SparkSqlStep(
+            missRecordsName,
+            missRecordsSql,
+            Map[String, Any](),
+            resultPersistType(details, AccuracyInfo._MissRecords, RecordPersistType),
+            resultUpdateDataSourceOpt(details, AccuracyInfo._MissRecords)
+          )
+
+          // 2. miss count
+          val missTableName = "_miss_"
+          val missColName = getNameOpt(details, AccuracyInfo._Miss).getOrElse(AccuracyInfo._Miss)
+          val missSql = {
+            s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsName}` GROUP BY `${GroupByColumn.tmst}`"
+          }
+          val missStep = SparkSqlStep(
+            missTableName,
+            missSql,
+            Map[String, Any](),
+            NonePersistType,
+            None
+          )
+
+          // 3. total count
+          val totalTableName = "_total_"
+          val totalColName = getNameOpt(details, AccuracyInfo._Total).getOrElse(AccuracyInfo._Total)
+          val totalSql = {
+            s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${GroupByColumn.tmst}`"
+          }
+          val totalStep = SparkSqlStep(
+            totalTableName,
+            totalSql,
+            Map[String, Any](),
+            NonePersistType,
+            None
+          )
+
+          // 4. accuracy metric
+          val matchedColName = getNameOpt(details, AccuracyInfo._Matched).getOrElse(AccuracyInfo._Matched)
+          val accuracyMetricSql = {
+            s"""
+               |SELECT `${totalTableName}`.`${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`,
+               |`${missTableName}`.`${missColName}` AS `${missColName}`,
+               |`${totalTableName}`.`${totalColName}` AS `${totalColName}`
+               |FROM `${totalTableName}` FULL JOIN `${missTableName}`
+               |ON `${totalTableName}`.`${GroupByColumn.tmst}` = `${missTableName}`.`${GroupByColumn.tmst}`
+          """.stripMargin
+          }
+          val accuracyMetricName = resultName(details, AccuracyInfo._Accuracy)
+          val accuracyMetricStep = SparkSqlStep(
+            accuracyMetricName,
+            accuracyMetricSql,
+            details,
+            //          resultPersistType(details, AccuracyInfo._Accuracy, MetricPersistType)
+            NonePersistType,
+            None
+          )
+
+          // 5. accuracy metric filter
+          val accuracyStep = DfOprStep(
+            accuracyMetricName,
+            "accuracy",
+            Map[String, Any](
+              ("df.name" -> accuracyMetricName),
+              ("miss" -> missColName),
+              ("total" -> totalColName),
+              ("matched" -> matchedColName),
+              ("tmst" -> GroupByColumn.tmst)
+            ),
+            resultPersistType(details, AccuracyInfo._Accuracy, MetricPersistType),
+            None
+          )
+
+          missRecordsStep :: missStep :: totalStep :: accuracyMetricStep :: accuracyStep :: Nil
+        }
+      }
+      case ProfilingType => {
+        val sourceName = getNameOpt(details, ProfilingInfo._Source) match {
+          case Some(name) => name
+          case _ => dataSourceNames.head
+        }
+        val analyzer = ProfilingAnalyzer(expr.asInstanceOf[ProfilingClause], sourceName)
+
+        analyzer.selectionExprs.foreach(println)
+
+        val selExprDescs = analyzer.selectionExprs.map { sel =>
+          val alias = sel match {
+            case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`"
+            case _ => ""
+          }
+          s"${sel.desc}${alias}"
+        }
+
+//        val selClause = (s"`${GroupByColumn.tmst}`" +: selExprDescs).mkString(", ")
+        val selClause = if (analyzer.containsAllSelectionExpr) {
+          selExprDescs.mkString(", ")
+        } else {
+          (s"`${GroupByColumn.tmst}`" +: selExprDescs).mkString(", ")
+        }
+
+//        val tailClause = analyzer.tailsExprs.map(_.desc).mkString(" ")
+        val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${GroupByColumn.tmst}`") :: Nil, None)
+        val mergedGroubbyClause = tmstGroupbyClause.merge(analyzer.groupbyExprOpt match {
+          case Some(gbc) => gbc
+          case _ => GroupbyClause(Nil, None)
+        })
+        val groupbyClause = mergedGroubbyClause.desc
+        val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
+        val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ")
+
+        if (!checkDataSourceExists(sourceName)) {
+          Nil
+        } else {
+          // 1. select statement
+          val profilingSql = {
+//            s"SELECT `${GroupByColumn.tmst}`, ${selClause} FROM ${sourceName} ${tailClause} GROUP BY `${GroupByColumn.tmst}`"
+            s"SELECT ${selClause} FROM ${sourceName} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+          }
+          val profilingMetricName = resultName(details, ProfilingInfo._Profiling)
+          val profilingStep = SparkSqlStep(
+            profilingMetricName,
+            profilingSql,
+            details,
+            resultPersistType(details, ProfilingInfo._Profiling, MetricPersistType),
+            None
+          )
+
+          // 2. clear processed data
+//          val clearDataSourceStep = DfOprStep(
+//            s"${sourceName}_clear",
+//            "clear",
+//            Map[String, Any](
+//              ("df.name" -> sourceName)
+//            ),
+//            NonePersistType,
+//            Some(sourceName)
+//          )
+//
+//          profilingStep :: clearDataSourceStep :: Nil
+
+          profilingStep:: Nil
+        }
+
+      }
+      case TimelinessType => {
+        Nil
+      }
+      case _ => Nil
+    }
+  }
+
+  private def checkDataSourceExists(name: String): Boolean = {
+    try {
+      RuleAdaptorGroup.dataChecker.existDataSourceName(name)
+    } catch {
+      case e: Throwable => {
+        error(s"check data source exists error: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
new file mode 100644
index 0000000..744f52a
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
@@ -0,0 +1,72 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+import java.util.concurrent.atomic.AtomicLong
+
+
+import scala.collection.mutable.{Set => MutableSet}
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.rule.step.{ConcreteRuleStep, RuleStep}
+import org.apache.griffin.measure.rule.dsl.{DslType, PersistType}
+
+trait RuleAdaptor extends Loggable with Serializable {
+
+  val adaptPhase: AdaptPhase
+
+  val _name = "name"
+  val _rule = "rule"
+  val _persistType = "persist.type"
+  val _updateDataSource = "update.data.source"
+  val _details = "details"
+
+  protected def getName(param: Map[String, Any]) = param.getOrElse(_name, RuleStepNameGenerator.genName).toString
+  protected def getRule(param: Map[String, Any]) = param.getOrElse(_rule, "").toString
+  protected def getPersistType(param: Map[String, Any]) = PersistType(param.getOrElse(_persistType, "").toString)
+  protected def getUpdateDataSource(param: Map[String, Any]) = param.get(_updateDataSource).map(_.toString)
+  protected def getDetails(param: Map[String, Any]) = param.get(_details) match {
+    case Some(dt: Map[String, Any]) => dt
+    case _ => Map[String, Any]()
+  }
+
+  def getTempSourceNames(param: Map[String, Any]): Seq[String]
+
+  def genRuleStep(param: Map[String, Any]): Seq[RuleStep]
+  def genConcreteRuleStep(param: Map[String, Any]): Seq[ConcreteRuleStep] = {
+    genRuleStep(param).flatMap { rs =>
+      adaptConcreteRuleStep(rs)
+    }
+  }
+  protected def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep]
+
+}
+
+object RuleStepNameGenerator {
+  private val counter: AtomicLong = new AtomicLong(0L)
+  private val head: String = "rs"
+
+  def genName: String = {
+    s"${head}${increment}"
+  }
+
+  private def increment: Long = {
+    counter.incrementAndGet()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
new file mode 100644
index 0000000..237902a
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
@@ -0,0 +1,105 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.process.check.DataChecker
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.step._
+import org.apache.spark.sql.SQLContext
+
+import scala.collection.mutable.{Map => MutableMap}
+
+object RuleAdaptorGroup {
+
+  val _dslType = "dsl.type"
+
+  var dataSourceNames: Seq[String] = _
+  var functionNames: Seq[String] = _
+
+  var dataChecker: DataChecker = _
+
+  def init(sqlContext: SQLContext, dsNames: Seq[String]): Unit = {
+    val functions = sqlContext.sql("show functions")
+    functionNames = functions.map(_.getString(0)).collect
+    dataSourceNames = dsNames
+
+    dataChecker = DataChecker(sqlContext)
+  }
+
+  private def getDslType(param: Map[String, Any], defDslType: DslType) = {
+    val dt = DslType(param.getOrElse(_dslType, "").toString)
+    dt match {
+      case UnknownDslType => defDslType
+      case _ => dt
+    }
+  }
+
+  private def genRuleAdaptor(dslType: DslType, dsNames: Seq[String], adaptPhase: AdaptPhase): Option[RuleAdaptor] = {
+    dslType match {
+      case SparkSqlType => Some(SparkSqlAdaptor(adaptPhase))
+      case DfOprType => Some(DataFrameOprAdaptor(adaptPhase))
+      case GriffinDslType => Some(GriffinDslAdaptor(dsNames, functionNames, adaptPhase))
+      case _ => None
+    }
+  }
+
+//  def genRuleSteps(evaluateRuleParam: EvaluateRuleParam): Seq[RuleStep] = {
+//    val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else evaluateRuleParam.dslType
+//    val defaultDslType = DslType(dslTypeStr)
+//    val rules = evaluateRuleParam.rules
+//    var dsNames = dataSourceNames
+//    val steps = rules.flatMap { param =>
+//      val dslType = getDslType(param)
+//      genRuleAdaptor(dslType) match {
+//        case Some(ruleAdaptor) => ruleAdaptor.genRuleStep(param)
+//        case _ => Nil
+//      }
+//    }
+//    steps.foreach(println)
+//    steps
+//  }
+
+  def genConcreteRuleSteps(evaluateRuleParam: EvaluateRuleParam,
+                           adaptPhase: AdaptPhase
+                          ): Seq[ConcreteRuleStep] = {
+    val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else evaluateRuleParam.dslType
+    val defaultDslType = DslType(dslTypeStr)
+    val ruleParams = evaluateRuleParam.rules
+    genConcreteRuleSteps(ruleParams, defaultDslType, adaptPhase)
+  }
+
+  def genConcreteRuleSteps(ruleParams: Seq[Map[String, Any]],
+                           defDslType: DslType, adaptPhase: AdaptPhase
+                          ): Seq[ConcreteRuleStep] = {
+    val (steps, dsNames) = ruleParams.foldLeft((Seq[ConcreteRuleStep](), dataSourceNames)) { (res, param) =>
+      val (preSteps, preNames) = res
+      val dslType = getDslType(param, defDslType)
+      val (curSteps, curNames) = genRuleAdaptor(dslType, preNames, adaptPhase) match {
+        case Some(ruleAdaptor) => (ruleAdaptor.genConcreteRuleStep(param), preNames ++ ruleAdaptor.getTempSourceNames(param))
+        case _ => (Nil, preNames)
+      }
+      (preSteps ++ curSteps, curNames)
+    }
+    steps
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
new file mode 100644
index 0000000..78121fa
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+import org.apache.griffin.measure.data.connector.GroupByColumn
+import org.apache.griffin.measure.rule.step._
+
+case class SparkSqlAdaptor(adaptPhase: AdaptPhase) extends RuleAdaptor {
+
+  def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = {
+    SparkSqlStep(getName(param), getRule(param), getDetails(param),
+      getPersistType(param), getUpdateDataSource(param)) :: Nil
+  }
+  def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = {
+    ruleStep match {
+      case rs @ SparkSqlStep(name, rule, details, persistType, udsOpt) => {
+        adaptPhase match {
+          case PreProcPhase => rs :: Nil
+          case RunPhase => {
+            val repSel = rule.replaceFirst("(?i)select", s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`,")
+            val groupbyRule = repSel.concat(s" GROUP BY `${GroupByColumn.tmst}`")
+            val nrs = SparkSqlStep(name, groupbyRule, details, persistType, udsOpt)
+            nrs :: Nil
+          }
+        }
+      }
+      case _ => Nil
+    }
+  }
+
+  def getTempSourceNames(param: Map[String, Any]): Seq[String] = {
+    param.get(_name) match {
+      case Some(name) => name.toString :: Nil
+      case _ => Nil
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
new file mode 100644
index 0000000..ac27403
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl
+
+import scala.util.matching.Regex
+
+
+sealed trait DqType {
+  val regex: Regex
+  val desc: String
+}
+
+object DqType {
+  private val dqTypes: List[DqType] = List(AccuracyType, ProfilingType, TimelinessType, UnknownType)
+  def apply(ptn: String): DqType = {
+    dqTypes.filter(tp => ptn match {
+      case tp.regex() => true
+      case _ => false
+    }).headOption.getOrElse(UnknownType)
+  }
+  def unapply(pt: DqType): Option[String] = Some(pt.desc)
+}
+
+final case object AccuracyType extends DqType {
+  val regex = "^(?i)accuracy$".r
+  val desc = "accuracy"
+}
+
+final case object ProfilingType extends DqType {
+  val regex = "^(?i)profiling$".r
+  val desc = "profiling$"
+}
+
+final case object TimelinessType extends DqType {
+  val regex = "^(?i)timeliness$".r
+  val desc = "timeliness"
+}
+
+final case object UnknownType extends DqType {
+  val regex = "".r
+  val desc = "unknown"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala
new file mode 100644
index 0000000..cfda393
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl
+
+import scala.util.matching.Regex
+
+
+sealed trait DslType {
+  val regex: Regex
+  val desc: String
+}
+
+object DslType {
+  private val dslTypes: List[DslType] = List(SparkSqlType, GriffinDslType, DfOprType, UnknownDslType)
+  def apply(ptn: String): DslType = {
+    dslTypes.filter(tp => ptn match {
+      case tp.regex() => true
+      case _ => false
+    }).headOption.getOrElse(UnknownDslType)
+  }
+  def unapply(pt: DslType): Option[String] = Some(pt.desc)
+}
+
+final case object SparkSqlType extends DslType {
+  val regex = "^(?i)spark-?sql$".r
+  val desc = "spark-sql"
+}
+
+final case object DfOprType extends DslType {
+  val regex = "^(?i)df-?opr$".r
+  val desc = "df-opr"
+}
+
+final case object GriffinDslType extends DslType {
+  val regex = "^(?i)griffin-?dsl$".r
+  val desc = "griffin-dsl"
+}
+
+final case object UnknownDslType extends DslType {
+  val regex = "".r
+  val desc = "unknown"
+}
\ No newline at end of file