You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/09/30 08:35:22 UTC
[07/11] incubator-griffin git commit: Dsl modify
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala
deleted file mode 100644
index c969012..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import scala.util.{Success, Try}
-
-
-object CalculationUtil {
-
- implicit def option2CalculationValue(v: Option[_]): CalculationValue = CalculationValue(v)
-
- // redefine the calculation method of operators in DSL
- case class CalculationValue(value: Option[_]) extends Serializable {
-
- def + (other: Option[_]): Option[_] = {
- Try {
- (value, other) match {
- case (None, _) | (_, None) => None
- case (Some(null), _) | (_, Some(null)) => None
- case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString)
- case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte)
- case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort)
- case (Some(v1: Int), Some(v2)) => Some(v1 + v2.toString.toInt)
- case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong)
- case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat)
- case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble)
- case _ => value
- }
- } match {
- case Success(opt) => opt
- case _ => None
- }
- }
-
- def - (other: Option[_]): Option[_] = {
- Try {
- (value, other) match {
- case (None, _) | (_, None) => None
- case (Some(null), _) | (_, Some(null)) => None
- case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte)
- case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort)
- case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt)
- case (Some(v1: Long), Some(v2)) => Some(v1 - v2.toString.toLong)
- case (Some(v1: Float), Some(v2)) => Some(v1 - v2.toString.toFloat)
- case (Some(v1: Double), Some(v2)) => Some(v1 - v2.toString.toDouble)
- case _ => value
- }
- } match {
- case Success(opt) => opt
- case _ => None
- }
- }
-
- def * (other: Option[_]): Option[_] = {
- Try {
- (value, other) match {
- case (None, _) | (_, None) => None
- case (Some(null), _) | (_, Some(null)) => None
- case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2)
- case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt)
- case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte)
- case (Some(v1: Short), Some(v2)) => Some(v1 * v2.toString.toShort)
- case (Some(v1: Int), Some(v2)) => Some(v1 * v2.toString.toInt)
- case (Some(v1: Long), Some(v2)) => Some(v1 * v2.toString.toLong)
- case (Some(v1: Float), Some(v2)) => Some(v1 * v2.toString.toFloat)
- case (Some(v1: Double), Some(v2)) => Some(v1 * v2.toString.toDouble)
- case _ => value
- }
- } match {
- case Success(opt) => opt
- case _ => None
- }
- }
-
- def / (other: Option[_]): Option[_] = {
- Try {
- (value, other) match {
- case (None, _) | (_, None) => None
- case (Some(null), _) | (_, Some(null)) => None
- case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte)
- case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort)
- case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt)
- case (Some(v1: Long), Some(v2)) => Some(v1 / v2.toString.toLong)
- case (Some(v1: Float), Some(v2)) => Some(v1 / v2.toString.toFloat)
- case (Some(v1: Double), Some(v2)) => Some(v1 / v2.toString.toDouble)
- case _ => value
- }
- } match {
- case Success(opt) => opt
- case _ => None
- }
- }
-
- def % (other: Option[_]): Option[_] = {
- Try {
- (value, other) match {
- case (None, _) | (_, None) => None
- case (Some(null), _) | (_, Some(null)) => None
- case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte)
- case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort)
- case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt)
- case (Some(v1: Long), Some(v2)) => Some(v1 % v2.toString.toLong)
- case (Some(v1: Float), Some(v2)) => Some(v1 % v2.toString.toFloat)
- case (Some(v1: Double), Some(v2)) => Some(v1 % v2.toString.toDouble)
- case _ => value
- }
- } match {
- case Success(opt) => opt
- case _ => None
- }
- }
-
- def unary_- (): Option[_] = {
- value match {
- case None => None
- case Some(null) => None
- case Some(v: String) => Some(v.reverse.toString)
- case Some(v: Boolean) => Some(!v)
- case Some(v: Byte) => Some(-v)
- case Some(v: Short) => Some(-v)
- case Some(v: Int) => Some(-v)
- case Some(v: Long) => Some(-v)
- case Some(v: Float) => Some(-v)
- case Some(v: Double) => Some(-v)
- case Some(v) => Some(v)
- case _ => None
- }
- }
-
-
- def === (other: Option[_]): Option[Boolean] = {
- (value, other) match {
- case (None, None) => Some(true)
- case (Some(v1), Some(v2)) => Some(v1 == v2)
- case _ => Some(false)
- }
- }
-
- def =!= (other: Option[_]): Option[Boolean] = {
- (value, other) match {
- case (None, None) => Some(false)
- case (Some(v1), Some(v2)) => Some(v1 != v2)
- case _ => Some(true)
- }
- }
-
- def > (other: Option[_]): Option[Boolean] = {
- Try {
- (value, other) match {
- case (None, _) | (_, None) => None
- case (Some(null), _) | (_, Some(null)) => None
- case (Some(v1: String), Some(v2: String)) => Some(v1 > v2)
- case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble)
- case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble)
- case (Some(v1: Int), Some(v2)) => Some(v1 > v2.toString.toDouble)
- case (Some(v1: Long), Some(v2)) => Some(v1 > v2.toString.toDouble)
- case (Some(v1: Float), Some(v2)) => Some(v1 > v2.toString.toDouble)
- case (Some(v1: Double), Some(v2)) => Some(v1 > v2.toString.toDouble)
- case _ => None
- }
- } match {
- case Success(opt) => opt
- case _ => None
- }
- }
-
- def >= (other: Option[_]): Option[Boolean] = {
- Try {
- (value, other) match {
- case (None, None) | (Some(null), Some(null)) => Some(true)
- case (None, _) | (_, None) => None
- case (Some(null), _) | (_, Some(null)) => None
- case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2)
- case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble)
- case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble)
- case (Some(v1: Int), Some(v2)) => Some(v1 >= v2.toString.toDouble)
- case (Some(v1: Long), Some(v2)) => Some(v1 >= v2.toString.toDouble)
- case (Some(v1: Float), Some(v2)) => Some(v1 >= v2.toString.toDouble)
- case (Some(v1: Double), Some(v2)) => Some(v1 >= v2.toString.toDouble)
- case _ => None
- }
- } match {
- case Success(opt) => opt
- case _ => None
- }
- }
-
- def < (other: Option[_]): Option[Boolean] = {
- Try {
- (value, other) match {
- case (None, _) | (_, None) => None
- case (Some(null), _) | (_, Some(null)) => None
- case (Some(v1: String), Some(v2: String)) => Some(v1 < v2)
- case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble)
- case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble)
- case (Some(v1: Int), Some(v2)) => Some(v1 < v2.toString.toDouble)
- case (Some(v1: Long), Some(v2)) => Some(v1 < v2.toString.toDouble)
- case (Some(v1: Float), Some(v2)) => Some(v1 < v2.toString.toDouble)
- case (Some(v1: Double), Some(v2)) => Some(v1 < v2.toString.toDouble)
- case _ => None
- }
- } match {
- case Success(opt) => opt
- case _ => None
- }
- }
-
- def <= (other: Option[_]): Option[Boolean] = {
- Try {
- (value, other) match {
- case (None, None) | (Some(null), Some(null)) => Some(true)
- case (None, _) | (_, None) => None
- case (Some(null), _) | (_, Some(null)) => None
- case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2)
- case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble)
- case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble)
- case (Some(v1: Int), Some(v2)) => Some(v1 <= v2.toString.toDouble)
- case (Some(v1: Long), Some(v2)) => Some(v1 <= v2.toString.toDouble)
- case (Some(v1: Float), Some(v2)) => Some(v1 <= v2.toString.toDouble)
- case (Some(v1: Double), Some(v2)) => Some(v1 <= v2.toString.toDouble)
- case _ => None
- }
- } match {
- case Success(opt) => opt
- case _ => None
- }
- }
-
-
- def in (other: Iterable[Option[_]]): Option[Boolean] = {
- other.foldLeft(Some(false): Option[Boolean]) { (res, next) =>
- optOr(res, ===(next))
- }
- }
-
- def not_in (other: Iterable[Option[_]]): Option[Boolean] = {
- other.foldLeft(Some(true): Option[Boolean]) { (res, next) =>
- optAnd(res, =!=(next))
- }
- }
-
- def between (other: Iterable[Option[_]]): Option[Boolean] = {
- if (other.size < 2) None else {
- val (begin, end) = (other.head, other.tail.head)
- if (begin.isEmpty && end.isEmpty) Some(value.isEmpty)
- else optAnd(>=(begin), <=(end))
- }
- }
-
- def not_between (other: Iterable[Option[_]]): Option[Boolean] = {
- if (other.size < 2) None else {
- val (begin, end) = (other.head, other.tail.head)
- if (begin.isEmpty && end.isEmpty) Some(value.nonEmpty)
- else optOr(<(begin), >(end))
- }
- }
-
- def unary_! (): Option[Boolean] = {
- optNot(value)
- }
-
- def && (other: Option[_]): Option[Boolean] = {
- optAnd(value, other)
- }
-
- def || (other: Option[_]): Option[Boolean] = {
- optOr(value, other)
- }
-
-
- private def optNot(a: Option[_]): Option[Boolean] = {
- a match {
- case None => None
- case Some(null) => None
- case Some(v: Boolean) => Some(!v)
- case _ => None
- }
- }
- private def optAnd(a: Option[_], b: Option[_]): Option[Boolean] = {
- (a, b) match {
- case (None, _) | (_, None) => None
- case (Some(null), _) | (_, Some(null)) => None
- case (Some(false), _) | (_, Some(false)) => Some(false)
- case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2)
- case _ => None
- }
- }
- private def optOr(a: Option[_], b: Option[_]): Option[Boolean] = {
- (a, b) match {
- case (None, _) | (_, None) => None
- case (Some(null), _) | (_, Some(null)) => None
- case (Some(true), _) | (_, Some(true)) => Some(true)
- case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2)
- case _ => None
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala
deleted file mode 100644
index 9d027ec..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import org.apache.spark.sql.types._
-
-object DataTypeCalculationUtil {
-
- implicit def dataType2CalculationType(tp: DataType): CalculationType = CalculationType(tp)
-
- case class CalculationType(tp: DataType) extends Serializable {
- def binaryOpr (other: DataType): DataType = {
- (tp, other) match {
- case (NullType, _) | (_, NullType) => NullType
- case (t, _) => t
- }
- }
- def unaryOpr (): DataType = {
- tp
- }
- }
-
- case class DataTypeException() extends Exception {}
-
- def getDataType(value: Any): DataType = {
- value match {
- case v: String => StringType
- case v: Boolean => BooleanType
- case v: Long => LongType
- case v: Int => IntegerType
- case v: Short => ShortType
- case v: Byte => ByteType
- case v: Double => DoubleType
- case v: Float => FloatType
- case v: Map[_, _] => MapType(getSameDataType(v.keys), getSameDataType(v.values))
- case v: Iterable[_] => ArrayType(getSameDataType(v))
- case _ => NullType
- }
- }
-
- private def getSameDataType(values: Iterable[Any]): DataType = {
- values.foldLeft(NullType: DataType)((a, c) => genericTypeOf(a, getDataType(c)))
- }
-
- private def genericTypeOf(dt1: DataType, dt2: DataType): DataType = {
- if (dt1 == dt2) dt1 else {
- dt1 match {
- case NullType => dt2
- case StringType => dt1
- case DoubleType => {
- dt2 match {
- case StringType => dt2
- case DoubleType | FloatType | LongType | IntegerType | ShortType | ByteType => dt1
- case _ => throw DataTypeException()
- }
- }
- case FloatType => {
- dt2 match {
- case StringType | DoubleType => dt2
- case FloatType | LongType | IntegerType | ShortType | ByteType => dt1
- case _ => throw DataTypeException()
- }
- }
- case LongType => {
- dt2 match {
- case StringType | DoubleType | FloatType => dt2
- case LongType | IntegerType | ShortType | ByteType => dt1
- case _ => throw DataTypeException()
- }
- }
- case IntegerType => {
- dt2 match {
- case StringType | DoubleType | FloatType | LongType => dt2
- case IntegerType | ShortType | ByteType => dt1
- case _ => throw DataTypeException()
- }
- }
- case ShortType => {
- dt2 match {
- case StringType | DoubleType | FloatType | LongType | IntegerType => dt2
- case ShortType | ByteType => dt1
- case _ => throw DataTypeException()
- }
- }
- case ByteType => {
- dt2 match {
- case StringType | DoubleType | FloatType | LongType | IntegerType | ShortType => dt2
- case ByteType => dt1
- case _ => throw DataTypeException()
- }
- }
- case BooleanType => {
- dt2 match {
- case StringType => dt2
- case BooleanType => dt1
- case _ => throw DataTypeException()
- }
- }
- case MapType(kdt1, vdt1, _) => {
- dt2 match {
- case MapType(kdt2, vdt2, _) => MapType(genericTypeOf(kdt1, kdt2), genericTypeOf(vdt1, vdt2))
- case _ => throw DataTypeException()
- }
- }
- case ArrayType(vdt1, _) => {
- dt2 match {
- case ArrayType(vdt2, _) => ArrayType(genericTypeOf(vdt1, vdt2))
- case _ => throw DataTypeException()
- }
- }
- case _ => throw DataTypeException()
- }
- }
- }
-
- def sequenceDataTypeMap(aggr: Map[String, DataType], value: Map[String, Any]): Map[String, DataType] = {
- val dataTypes = value.foldLeft(Map[String, DataType]()) { (map, pair) =>
- val (k, v) = pair
- try {
- map + (k -> getDataType(v))
- } catch {
- case e: DataTypeException => map
- }
- }
- combineDataTypeMap(aggr, dataTypes)
- }
-
- def combineDataTypeMap(aggr1: Map[String, DataType], aggr2: Map[String, DataType]): Map[String, DataType] = {
- aggr2.foldLeft(aggr1) { (a, c) =>
- a.get(c._1) match {
- case Some(t) => {
- try {
- a + (c._1 -> genericTypeOf(t, c._2))
- } catch {
- case e: DataTypeException => a
- }
- }
- case _ => a + c
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala
deleted file mode 100644
index 940d0cb..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import org.apache.griffin.measure.rule.expr._
-import org.apache.griffin.measure.rule.func._
-import org.apache.spark.sql.Row
-
-import scala.util.{Success, Try}
-
-object ExprValueUtil {
-
- private def append(path: List[String], step: String): List[String] = {
- path :+ step
- }
-
- private def value2Map(key: String, value: Option[Any]): Map[String, Any] = {
- value.flatMap(v => Some((key -> v))).toMap
- }
-
- private def getSingleValue(data: Option[Any], desc: FieldDescOnly): Option[Any] = {
- data match {
- case Some(row: Row) => {
- desc match {
- case i: IndexDesc => try { Some(row.getAs[Any](i.index)) } catch { case _ => None }
- case f: FieldDesc => try { Some(row.getAs[Any](f.field)) } catch { case _ => None }
- case _ => None
- }
- }
- case Some(d: Map[String, Any]) => {
- desc match {
- case f: FieldDesc => d.get(f.field)
- case _ => None
- }
- }
- case Some(d: Seq[Any]) => {
- desc match {
- case i: IndexDesc => if (i.index >= 0 && i.index < d.size) Some(d(i.index)) else None
- case _ => None
- }
- }
- }
- }
-
- private def calcExprValues(pathDatas: List[(List[String], Option[Any])], expr: Expr, existExprValueMap: Map[String, Any]): List[(List[String], Option[Any])] = {
- Try {
- expr match {
- case selection: SelectionExpr => {
- selection.selectors.foldLeft(pathDatas) { (pds, selector) =>
- calcExprValues(pds, selector, existExprValueMap)
- }
- }
- case selector: IndexFieldRangeSelectExpr => {
- pathDatas.flatMap { pathData =>
- val (path, data) = pathData
- data match {
- case Some(row: Row) => {
- selector.fields.flatMap { field =>
- field match {
- case (_: IndexDesc) | (_: FieldDesc) => {
- getSingleValue(data, field).map { v => (append(path, field.desc), Some(v)) }
- }
- case a: AllFieldsDesc => {
- (0 until row.size).flatMap { i =>
- getSingleValue(data, IndexDesc(i.toString)).map { v =>
- (append(path, s"${a.desc}_${i}"), Some(v))
- }
- }.toList
- }
- case r: FieldRangeDesc => {
- (r.startField, r.endField) match {
- case (si: IndexDesc, ei: IndexDesc) => {
- (si.index to ei.index).flatMap { i =>
- (append(path, s"${r.desc}_${i}"), getSingleValue(data, IndexDesc(i.toString)))
- getSingleValue(data, IndexDesc(i.toString)).map { v =>
- (append(path, s"${r.desc}_${i}"), Some(v))
- }
- }.toList
- }
- case _ => Nil
- }
- }
- case _ => Nil
- }
- }
- }
- case Some(d: Map[String, Any]) => {
- selector.fields.flatMap { field =>
- field match {
- case (_: IndexDesc) | (_: FieldDesc) => {
- getSingleValue(data, field).map { v => (append(path, field.desc), Some(v)) }
- }
- case a: AllFieldsDesc => {
- d.keySet.flatMap { k =>
- getSingleValue(data, FieldDesc(k)).map { v =>
- (append(path, s"${a.desc}_${k}"), Some(v))
- }
- }
- }
- case _ => None
- }
- }
- }
- case Some(d: Seq[Any]) => {
- selector.fields.flatMap { field =>
- field match {
- case (_: IndexDesc) | (_: FieldDesc) => {
- getSingleValue(data, field).map { v => (append(path, field.desc), Some(v)) }
- }
- case a: AllFieldsDesc => {
- (0 until d.size).flatMap { i =>
- (append(path, s"${a.desc}_${i}"), getSingleValue(data, IndexDesc(i.toString)))
- getSingleValue(data, IndexDesc(i.toString)).map { v =>
- (append(path, s"${a.desc}_${i}"), Some(v))
- }
- }.toList
- }
- case r: FieldRangeDesc => {
- (r.startField, r.endField) match {
- case (si: IndexDesc, ei: IndexDesc) => {
- (si.index to ei.index).flatMap { i =>
- (append(path, s"${r.desc}_${i}"), getSingleValue(data, IndexDesc(i.toString)))
- getSingleValue(data, IndexDesc(i.toString)).map { v =>
- (append(path, s"${r.desc}_${i}"), Some(v))
- }
- }.toList
- }
- case _ => None
- }
- }
- case _ => None
- }
- }
- }
- }
- }
- }
- case selector: FunctionOperationExpr => {
- val args: Array[Option[Any]] = selector.args.map { arg =>
- arg.calculate(existExprValueMap)
- }.toArray
- pathDatas.flatMap { pathData =>
- val (path, data) = pathData
- data match {
- case Some(d: String) => {
- val res = FunctionUtil.invoke(selector.func, Some(d) +: args)
- val residx = res.zipWithIndex
- residx.map { vi =>
- val (v, i) = vi
- val step = if (i == 0) s"${selector.desc}" else s"${selector.desc}_${i}"
- (append(path, step), v)
- }
- }
- case _ => None
- }
- }
- }
- case selector: FilterSelectExpr => { // fileter means select the items fit the condition
- pathDatas.flatMap { pathData =>
- val (path, data) = pathData
- data match {
- case Some(row: Row) => {
- // right value could not be selection
- val rmap = value2Map(selector.value._id, selector.value.calculate(existExprValueMap))
- (0 until row.size).flatMap { i =>
- val dt = getSingleValue(data, IndexDesc(i.toString))
- val lmap = value2Map(selector.fieldKey, getSingleValue(dt, selector.field))
- val partValueMap = lmap ++ rmap
- selector.calculate(partValueMap) match {
- case Some(true) => Some((append(path, s"${selector.desc}_${i}"), dt))
- case _ => None
- }
- }
- }
- case Some(d: Map[String, Any]) => {
- val rmap = value2Map(selector.value._id, selector.value.calculate(existExprValueMap))
- d.keySet.flatMap { k =>
- val dt = getSingleValue(data, FieldDesc(k))
- val lmap = value2Map(selector.fieldKey, getSingleValue(dt, selector.field))
- val partValueMap = lmap ++ rmap
- selector.calculate(partValueMap) match {
- case Some(true) => Some((append(path, s"${selector.desc}_${k}"), dt))
- case _ => None
- }
- }
- }
- case Some(d: Seq[Any]) => {
- val rmap = value2Map(selector.value._id, selector.value.calculate(existExprValueMap))
- (0 until d.size).flatMap { i =>
- val dt = getSingleValue(data, IndexDesc(i.toString))
- val lmap = value2Map(selector.fieldKey, getSingleValue(dt, selector.field))
- val partValueMap = lmap ++ rmap
- selector.calculate(partValueMap) match {
- case Some(true) => Some((append(path, s"${selector.desc}_${i}"), dt))
- case _ => None
- }
- }
- }
- }
- }
- }
- case _ => {
- (expr.desc :: Nil, expr.calculate(existExprValueMap)) :: Nil
- }
- }
- } match {
- case Success(v) => v
- case _ => Nil
- }
- }
-
- private def calcExprsValues(data: Option[Any], exprs: Iterable[Expr], existExprValueMap: Map[String, Any]): List[Map[String, Any]] = {
- val selectionValues: Map[String, List[(List[String], Any)]] = exprs.map { expr =>
- (expr._id, calcExprValues((Nil, data) :: Nil, expr, existExprValueMap).flatMap { pair =>
- pair._2 match {
- case Some(v) => Some((pair._1, v))
- case _ => None
- }
- })
- }.toMap
- // if exprs is empty, return an empty value map for each row
- if (selectionValues.isEmpty) List(Map[String, Any]())
- else SchemaValueCombineUtil.cartesian(selectionValues)
- }
-
- // try to calculate some exprs from data and initExprValueMap, generate a new expression value map
- // depends on origin data and existed expr value map
- def genExprValueMaps(data: Option[Any], exprs: Iterable[Expr], initExprValueMap: Map[String, Any]): List[Map[String, Any]] = {
- val (selections, nonSelections) = exprs.partition(_.isInstanceOf[SelectionExpr])
- val valueMaps = calcExprsValues(data, selections, initExprValueMap)
- updateExprValueMaps(nonSelections, valueMaps)
- }
-
- // with exprValueMap, calculate expressions, update the expression value map
- // only depends on existed expr value map, only calculation, not need origin data
- def updateExprValueMaps(exprs: Iterable[Expr], exprValueMaps: List[Map[String, Any]]): List[Map[String, Any]] = {
- exprValueMaps.map { valueMap =>
- exprs.foldLeft(valueMap) { (em, expr) =>
- expr.calculate(em) match {
- case Some(v) => em + (expr._id -> v)
- case _ => em
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala
deleted file mode 100644
index 5ec143f..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import org.apache.griffin.measure.rule.expr._
-
-case class RuleAnalyzer(rule: StatementExpr) extends Serializable {
-
- val constData = ""
- private val SourceData = "source"
- private val TargetData = "target"
-
- val constCacheExprs: Iterable[Expr] = rule.getCacheExprs(constData)
- private val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData)
- private val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData)
-
- private val sourcePersistExprs: Iterable[Expr] = rule.getPersistExprs(SourceData)
- private val targetPersistExprs: Iterable[Expr] = rule.getPersistExprs(TargetData)
-
- val constFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(constData).toSet
- private val sourceFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(SourceData).toSet ++ sourcePersistExprs.toSet
- private val targetFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(TargetData).toSet ++ targetPersistExprs.toSet
-
- private val groupbyExprPairs: Seq[(Expr, Expr)] = rule.getGroupbyExprPairs((SourceData, TargetData))
- private val sourceGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._1)
- private val targetGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._2)
-
- val sourceRuleExprs: RuleExprs = RuleExprs(sourceGroupbyExprs, sourceCacheExprs,
- sourceFinalCacheExprs, sourcePersistExprs)
- val targetRuleExprs: RuleExprs = RuleExprs(targetGroupbyExprs, targetCacheExprs,
- targetFinalCacheExprs, targetPersistExprs)
-
-}
-
-
-// for a single data source
-// groupbyExprs: in accuracy case, these exprs could be groupby exprs
-// Data keys for accuracy case, generated by the equal statements, to improve the calculation efficiency
-// cacheExprs: the exprs value could be caculated independently, and cached for later use
-// Cached for the finalCacheExprs calculation, it has some redundant values, saving it wastes a lot
-// finalCacheExprs: the root of cachedExprs, cached for later use, plus with persistExprs
-// Cached for the calculation usage, and can be saved for the re-calculation in streaming mode
-// persistExprs: the expr values should be persisted, only the direct selection exprs are persistable
-// Persisted for record usage, to record the missing data, need be readable as raw data
-case class RuleExprs(groupbyExprs: Seq[Expr],
- cacheExprs: Iterable[Expr],
- finalCacheExprs: Iterable[Expr],
- persistExprs: Iterable[Expr]
- ) extends Serializable {
- // for example: for a rule "$source.name = $target.name AND $source.age < $target.age + (3 * 4)"
- // in this rule, for the target data source, the targetRuleExprs looks like below
- // groupbyExprs: $target.name
- // cacheExprs: $target.name, $target.age, $target.age + (3 * 4)
- // finalCacheExprs: $target.name, $target.age + (3 * 4), $target.age
- // persistExprs: $target.name, $target.age
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala
deleted file mode 100644
index bbaf5cb..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import org.apache.griffin.measure.config.params.user._
-
-import scala.util.Failure
-//import org.apache.griffin.measure.rule.expr_old._
-import org.apache.griffin.measure.rule.expr._
-
-import scala.util.{Success, Try}
-
-
-case class RuleFactory(evaluateRuleParam: EvaluateRuleParam) {
-
- val ruleParser: RuleParser = RuleParser()
-
- def generateRule(): StatementExpr = {
- val rules = evaluateRuleParam.rules
- val statement = parseExpr(rules) match {
- case Success(se) => se
- case Failure(ex) => throw ex
- }
- statement
- }
-
- private def parseExpr(rules: String): Try[StatementExpr] = {
- Try {
- val result = ruleParser.parseAll(ruleParser.rule, rules)
- if (result.successful) result.get
- else throw new Exception("parse rule error!")
-// throw new Exception("parse rule error!")
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala
deleted file mode 100644
index 55d9f45..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import org.apache.griffin.measure.rule.expr._
-
-import scala.util.parsing.combinator._
-
-case class RuleParser() extends JavaTokenParsers with Serializable {
-
- /**
- * BNF representation for grammar as below:
- *
- * <rule> ::= <logical-statement> [WHEN <logical-statement>]
- * rule: mapping-rule [WHEN when-rule]
- * - mapping-rule: the first level opr should better not be OR | NOT, otherwise it can't automatically find the groupby column
- * - when-rule: only contain the general info of data source, not the special info of each data row
- *
- * <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")"
- * logical-statement: return boolean value
- * logical-operator: "AND" | "&&", "OR" | "||", "NOT" | "!"
- *
- * <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>)
- * logical-expression example: $source.id = $target.id, $source.page_id IN ('3214', '4312', '60821')
- *
- * <compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
- * <range-opr> ::= ["NOT"] "IN" | "BETWEEN"
- * <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
- * range-expr example: ('3214', '4312', '60821'), (10, 15), ()
- *
- * <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
- * math-expr example: $source.price * $target.count, "hello" + " " + "world" + 123
- *
- * <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
- * <unary-opr> ::= "+" | "-"
- *
- * <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
- *
- * <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+
- * selection example: $source.price, $source.json(), $source['state'], $source.numList[3], $target.json().mails['org' = 'apache'].names[*]
- *
- * <selection-head> ::= $source | $target
- *
- * <field-sel> ::= "." <field-string>
- *
- * <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
- * <function-name> ::= <name-string>
- * <arg> ::= <math-expr>
- *
- * <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]"
- * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*"
- * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * means all items, 'age' means item 'age'
- * <index-field> ::= <index> | <field-quote> | <all-selection>
- * index: 0 ~ n means position from start, -1 ~ -n means position from end
- * <field-quote> ::= ' <field-string> ' | " <field-string> "
- *
- * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
- * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
- * filter-sel example: ['name' = 'URL'], $source.man['age' > $source.graduate_age + 5 ]
- *
- * When <math-expr> in the selection, it mustn't contain the different <selection-head>, for example:
- * $source.tags[1+2] valid
- * $source.tags[$source.first] valid
- * $source.tags[$target.first] invalid
- * -- Such job is for validation, not for parser
- *
- *
- * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> | <literal-null> | <literal-none>
- * <literal-string> ::= <any-string>
- * <literal-number> ::= <integer> | <double>
- * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
- * <literal-boolean> ::= true | false
- * <literal-null> ::= null | undefined
- * <literal-none> ::= none
- *
- */
-
- object Keyword {
- def WhenKeywords: Parser[String] = """(?i)when""".r
- def UnaryLogicalKeywords: Parser[String] = """(?i)not""".r
- def BinaryLogicalKeywords: Parser[String] = """(?i)and|or""".r
- def RangeKeywords: Parser[String] = """(?i)(not\s+)?(in|between)""".r
- def DataSourceKeywords: Parser[String] = """(?i)\$(source|target)""".r
- def Keywords: Parser[String] = WhenKeywords | UnaryLogicalKeywords | BinaryLogicalKeywords | RangeKeywords | DataSourceKeywords
- }
- import Keyword._
-
- object Operator {
- def NotLogicalOpr: Parser[String] = """(?i)not""".r | "!"
- def AndLogicalOpr: Parser[String] = """(?i)and""".r | "&&"
- def OrLogicalOpr: Parser[String] = """(?i)or""".r | "||"
- def CompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r
- def RangeOpr: Parser[String] = RangeKeywords
-
- def UnaryMathOpr: Parser[String] = "+" | "-"
- def BinaryMathOpr1: Parser[String] = "*" | "/" | "%"
- def BinaryMathOpr2: Parser[String] = "+" | "-"
-
- def FilterCompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r
-
- def SqBracketPair: (Parser[String], Parser[String]) = ("[", "]")
- def BracketPair: (Parser[String], Parser[String]) = ("(", ")")
- def Dot: Parser[String] = "."
- def AllSelection: Parser[String] = "*"
- def SQuote: Parser[String] = "'"
- def DQuote: Parser[String] = "\""
- def Comma: Parser[String] = ","
- }
- import Operator._
-
- object SomeString {
-// def AnyString: Parser[String] = """[^'\"{}\[\]()=<>.$@,;+\-*/\\]*""".r
- def AnyString: Parser[String] = """[^'\"]*""".r
- def SimpleFieldString: Parser[String] = """\w+""".r
- def FieldString: Parser[String] = """[\w\s]+""".r
- def NameString: Parser[String] = """[a-zA-Z_]\w*""".r
- }
- import SomeString._
-
- object SomeNumber {
- def IntegerNumber: Parser[String] = """[+\-]?\d+""".r
- def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r
- def IndexNumber: Parser[String] = IntegerNumber
- }
- import SomeNumber._
-
- // -- literal --
- def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean | literialNull | literialNone
- def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) }
- def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) }
- def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { LiteralTimeExpr(_) }
- def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | """(?i)false""".r) ^^ { LiteralBooleanExpr(_) }
- def literialNull: Parser[LiteralNullExpr] = ("""(?i)null""".r | """(?i)undefined""".r) ^^ { LiteralNullExpr(_) }
- def literialNone: Parser[LiteralNoneExpr] = """(?i)none""".r ^^ { LiteralNoneExpr(_) }
-
- // -- selection --
- // <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+
- def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ^^ {
- case head ~ selectors => SelectionExpr(head, selectors)
- }
- def selector: Parser[SelectExpr] = (functionOperation | fieldSelect | indexFieldRangeSelect | filterSelect)
-
- def selectionHead: Parser[SelectionHead] = DataSourceKeywords ^^ { SelectionHead(_) }
- // <field-sel> ::= "." <field-string>
- def fieldSelect: Parser[IndexFieldRangeSelectExpr] = Dot ~> SimpleFieldString ^^ {
- case field => IndexFieldRangeSelectExpr(FieldDesc(field) :: Nil)
- }
- // <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
- def functionOperation: Parser[FunctionOperationExpr] = Dot ~ NameString ~ BracketPair._1 ~ repsep(argument, Comma) ~ BracketPair._2 ^^ {
- case _ ~ func ~ _ ~ args ~ _ => FunctionOperationExpr(func, args)
- }
- def argument: Parser[MathExpr] = mathExpr
- // <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]"
- def indexFieldRangeSelect: Parser[IndexFieldRangeSelectExpr] = SqBracketPair._1 ~> rep1sep(indexFieldRange, Comma) <~ SqBracketPair._2 ^^ {
- case ifrs => IndexFieldRangeSelectExpr(ifrs)
- }
- // <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*"
- def indexFieldRange: Parser[FieldDescOnly] = indexField | BracketPair._1 ~ indexField ~ Comma ~ indexField ~ BracketPair._2 ^^ {
- case _ ~ if1 ~ _ ~ if2 ~ _ => FieldRangeDesc(if1, if2)
- }
- // <index-field> ::= <index> | <field-quote> | <all-selection>
- // *here it can parse <math-expr>, but for simple situation, not supported now*
- def indexField: Parser[FieldDescOnly] = IndexNumber ^^ { IndexDesc(_) } | fieldQuote | AllSelection ^^ { AllFieldsDesc(_) }
- // <field-quote> ::= ' <field-string> ' | " <field-string> "
- def fieldQuote: Parser[FieldDesc] = (SQuote ~> FieldString <~ SQuote | DQuote ~> FieldString <~ DQuote) ^^ { FieldDesc(_) }
- // <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
- def filterSelect: Parser[FilterSelectExpr] = SqBracketPair._1 ~> fieldQuote ~ FilterCompareOpr ~ mathExpr <~ SqBracketPair._2 ^^ {
- case field ~ compare ~ value => FilterSelectExpr(field, compare, value)
- }
-
- // -- math --
- // <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
- def mathFactor: Parser[MathExpr] = (literal | selection | BracketPair._1 ~> mathExpr <~ BracketPair._2) ^^ { MathFactorExpr(_) }
- // <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
- // <unary-opr> ::= "+" | "-"
- def unaryMathExpr: Parser[MathExpr] = rep(UnaryMathOpr) ~ mathFactor ^^ {
- case Nil ~ a => a
- case list ~ a => UnaryMathExpr(list, a)
- }
- // <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
- def binaryMathExpr1: Parser[MathExpr] = unaryMathExpr ~ rep(BinaryMathOpr1 ~ unaryMathExpr) ^^ {
- case a ~ Nil => a
- case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
- }
- def binaryMathExpr2: Parser[MathExpr] = binaryMathExpr1 ~ rep(BinaryMathOpr2 ~ binaryMathExpr1) ^^ {
- case a ~ Nil => a
- case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
- }
- def mathExpr: Parser[MathExpr] = binaryMathExpr2
-
- // -- logical expression --
- // <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
- def rangeExpr: Parser[RangeDesc] = BracketPair._1 ~> repsep(mathExpr, Comma) <~ BracketPair._2 ^^ { RangeDesc(_) }
- // <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>)
- def logicalExpr: Parser[LogicalExpr] = mathExpr ~ CompareOpr ~ mathExpr ^^ {
- case left ~ opr ~ right => LogicalCompareExpr(left, opr, right)
- } | mathExpr ~ RangeOpr ~ rangeExpr ^^ {
- case left ~ opr ~ range => LogicalRangeExpr(left, opr, range)
- } | mathExpr ^^ { LogicalSimpleExpr(_) }
-
- // -- logical statement --
- def logicalFactor: Parser[LogicalExpr] = logicalExpr | BracketPair._1 ~> logicalStatement <~ BracketPair._2
- def notLogicalStatement: Parser[LogicalExpr] = rep(NotLogicalOpr) ~ logicalFactor ^^ {
- case Nil ~ a => a
- case list ~ a => UnaryLogicalExpr(list, a)
- }
- def andLogicalStatement: Parser[LogicalExpr] = notLogicalStatement ~ rep(AndLogicalOpr ~ notLogicalStatement) ^^ {
- case a ~ Nil => a
- case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
- }
- def orLogicalStatement: Parser[LogicalExpr] = andLogicalStatement ~ rep(OrLogicalOpr ~ andLogicalStatement) ^^ {
- case a ~ Nil => a
- case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
- }
- // <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")"
- def logicalStatement: Parser[LogicalExpr] = orLogicalStatement
-
- // -- clause statement --
- def whereClause: Parser[WhereClauseExpr] = logicalStatement ^^ { WhereClauseExpr(_) }
- def whenClause: Parser[WhenClauseExpr] = WhenKeywords ~> logicalStatement ^^ { WhenClauseExpr(_) }
-
- // -- rule --
- // <rule> ::= <logical-statement> [WHEN <logical-statement>]
- def rule: Parser[StatementExpr] = whereClause ~ opt(whenClause) ^^ {
- case a ~ b => StatementExpr(a, b)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala
deleted file mode 100644
index ed3b3fc..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-object SchemaValueCombineUtil {
-
- // Map[String, List[(List[String], T)]]: Map[key, List[(path, value)]]
- def cartesian[T](valuesMap: Map[String, List[(List[String], T)]]): List[Map[String, T]] = {
- val fieldsList: List[(String, List[(List[String], T)])] = valuesMap.toList
-
- // List[key, List[(path, value)]] to List[(path, (key, value))]
- val valueList: List[(List[String], (String, T))] = fieldsList.flatMap { fields =>
- val (key, list) = fields
- list.map { pv =>
- val (path, value) = pv
- (path, (key, value))
- }
- }
-
- // 1. generate tree from value list, and return root node
- val root = TreeUtil.genRootTree(valueList)
-
- // 2. deep first visit tree from root, merge datas into value map list
- val valueMapList: List[Map[String, _]] = TreeUtil.mergeDatasIntoMap(root, Nil)
-
- // 3. simple change
- val result = valueMapList.map { mp =>
- mp.map { kv =>
- val (k, v) = kv
- (k, v.asInstanceOf[T])
- }
- }
-
- result
-
- }
-
-
- case class TreeNode(key: String, var datas: List[(String, _)]) {
- var children = List[TreeNode]()
- def addChild(node: TreeNode): Unit = children = children :+ node
- def mergeSelf(node: TreeNode): Unit = datas = datas ::: node.datas
- }
-
- object TreeUtil {
- private def genTree(path: List[String], datas: List[(String, _)]): Option[TreeNode] = {
- path match {
- case Nil => None
- case head :: tail => {
- genTree(tail, datas) match {
- case Some(child) => {
- val curNode = TreeNode(head, Nil)
- curNode.addChild(child)
- Some(curNode)
- }
- case _ => Some(TreeNode(head, datas))
- }
- }
- }
- }
-
- private def mergeTrees(trees: List[TreeNode], newTreeOpt: Option[TreeNode]): List[TreeNode] = {
- newTreeOpt match {
- case Some(newTree) => {
- trees.find(tree => tree.key == newTree.key) match {
- case Some(tree) => {
- // children merge
- for (child <- newTree.children) {
- tree.children = mergeTrees(tree.children, Some(child))
- }
- // self data merge
- tree.mergeSelf(newTree)
- trees
- }
- case _ => trees :+ newTree
- }
- }
- case _ => trees
- }
- }
-
- private def root(): TreeNode = TreeNode("", Nil)
-
- def genRootTree(values: List[(List[String], (String, _))]): TreeNode = {
- val rootNode = root()
- val nodeOpts = values.map(value => genTree(value._1, value._2 :: Nil))
- rootNode.children = nodeOpts.foldLeft(List[TreeNode]()) { (trees, treeOpt) =>
- mergeTrees(trees, treeOpt)
- }
- rootNode
- }
-
- private def add(mapList1: List[Map[String, _]], mapList2: List[Map[String, _]]): List[Map[String, _]] = {
- mapList1 ::: mapList2
- }
- private def multiply(mapList1: List[Map[String, _]], mapList2: List[Map[String, _]]): List[Map[String, _]] = {
- mapList1.flatMap { map1 =>
- mapList2.map { map2 =>
- map1 ++ map2
- }
- }
- }
-
- private def keysList(mapList: List[Map[String, _]]): List[String] = {
- val keySet = mapList match {
- case Nil => Set[String]()
- case head :: _ => head.keySet
- }
- keySet.toList
- }
-
- def mergeDatasIntoMap(root: TreeNode, mapDatas: List[Map[String, _]]): List[Map[String, _]] = {
- val childrenKeysMapDatas = root.children.foldLeft(Map[List[String], List[Map[String, _]]]()) { (keysMap, child) =>
- val childMdts = mergeDatasIntoMap(child, List[Map[String, _]]())
- childMdts match {
- case Nil => keysMap
- case _ => {
- val keys = keysList(childMdts)
- val afterList = keysMap.get(keys) match {
- case Some(list) => add(list, childMdts)
- case _ => childMdts
- }
- keysMap + (keys -> afterList)
- }
- }
- }
- val childrenMergeMaps = childrenKeysMapDatas.values.foldLeft(List[Map[String, _]]()) { (originList, list) =>
- originList match {
- case Nil => list
- case _ => multiply(originList, list)
- }
- }
- val result = mergeNodeChildrenDatasIntoMap(root, childrenMergeMaps)
- result
- }
-
- private def mergeNodeChildrenDatasIntoMap(node: TreeNode, mapDatas: List[Map[String, _]]): List[Map[String, _]] = {
- val datas: List[(String, (String, Any))] = node.children.flatMap { child =>
- child.datas.map(dt => (dt._1, (child.key, dt._2)))
- }
- val childrenDataKeys: Set[String] = datas.map(_._1).toSet
- val childrenDataLists: Map[String, List[(String, _)]] = datas.foldLeft(childrenDataKeys.map(k => (k, List[(String, _)]())).toMap) { (maps, data) =>
- maps.get(data._1) match {
- case Some(list) => maps + (data._1 -> (list :+ data._2))
- case _ => maps
- }
- }
-
- // multiply different key datas
- childrenDataLists.foldLeft(mapDatas) { (mdts, klPair) =>
- val (key, list) = klPair
- mdts match {
- case Nil => list.map(pr => Map[String, Any]((key -> pr._2)))
- case _ => {
- list.flatMap { kvPair =>
- val (path, value) = kvPair
- mdts.map { mp =>
- mp + (key -> value)
- }
- }
- }
- }
- }
-
- }
- }
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/AdaptPhase.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/AdaptPhase.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/AdaptPhase.scala
new file mode 100644
index 0000000..26db78d
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/AdaptPhase.scala
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+sealed trait AdaptPhase {}
+
+final case object PreProcPhase extends AdaptPhase {}
+
+final case object RunPhase extends AdaptPhase {}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
new file mode 100644
index 0000000..eb57838
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
@@ -0,0 +1,44 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.rule.step._
+
+case class DataFrameOprAdaptor(adaptPhase: AdaptPhase) extends RuleAdaptor {
+
+ def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = {
+ DfOprStep(getName(param), getRule(param), getDetails(param),
+ getPersistType(param), getUpdateDataSource(param)) :: Nil
+ }
+ def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = {
+ ruleStep match {
+ case rs @ DfOprStep(_, _, _, _, _) => rs :: Nil
+ case _ => Nil
+ }
+ }
+
+ def getTempSourceNames(param: Map[String, Any]): Seq[String] = {
+ param.get(_name) match {
+ case Some(name) => name.toString :: Nil
+ case _ => Nil
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
new file mode 100644
index 0000000..2a189d4
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
@@ -0,0 +1,349 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+import org.apache.griffin.measure.data.connector.GroupByColumn
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.dsl.analyzer._
+import org.apache.griffin.measure.rule.dsl.expr._
+import org.apache.griffin.measure.rule.dsl.parser.GriffinDslParser
+import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.utils.ParamUtil._
+
+case class GriffinDslAdaptor(dataSourceNames: Seq[String],
+ functionNames: Seq[String],
+ adaptPhase: AdaptPhase
+ ) extends RuleAdaptor {
+
+ object StepInfo {
+ val _Name = "name"
+ val _PersistType = "persist.type"
+ val _UpdateDataSource = "update.data.source"
+ def getNameOpt(param: Map[String, Any]): Option[String] = param.get(_Name).map(_.toString)
+ def getPersistType(param: Map[String, Any]): PersistType = PersistType(param.getString(_PersistType, ""))
+ def getUpdateDataSourceOpt(param: Map[String, Any]): Option[String] = param.get(_UpdateDataSource).map(_.toString)
+ }
+ object AccuracyInfo {
+ val _Source = "source"
+ val _Target = "target"
+ val _MissRecords = "miss.records"
+ val _Accuracy = "accuracy"
+ val _Miss = "miss"
+ val _Total = "total"
+ val _Matched = "matched"
+ }
+ object ProfilingInfo {
+ val _Source = "source"
+ val _Profiling = "profiling"
+ }
+
+ def getNameOpt(param: Map[String, Any], key: String): Option[String] = param.get(key).map(_.toString)
+ def resultName(param: Map[String, Any], key: String): String = {
+ val nameOpt = param.get(key) match {
+ case Some(prm: Map[String, Any]) => StepInfo.getNameOpt(prm)
+ case _ => None
+ }
+ nameOpt.getOrElse(key)
+ }
+ def resultPersistType(param: Map[String, Any], key: String, defPersistType: PersistType): PersistType = {
+ param.get(key) match {
+ case Some(prm: Map[String, Any]) => StepInfo.getPersistType(prm)
+ case _ => defPersistType
+ }
+ }
+ def resultUpdateDataSourceOpt(param: Map[String, Any], key: String): Option[String] = {
+ param.get(key) match {
+ case Some(prm: Map[String, Any]) => StepInfo.getUpdateDataSourceOpt(prm)
+ case _ => None
+ }
+ }
+
+ val _dqType = "dq.type"
+
+ protected def getDqType(param: Map[String, Any]) = DqType(param.getString(_dqType, ""))
+
+ val filteredFunctionNames = functionNames.filter { fn =>
+ fn.matches("""^[a-zA-Z_]\w*$""")
+ }
+ val parser = GriffinDslParser(dataSourceNames, filteredFunctionNames)
+
+ def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = {
+ GriffinDslStep(getName(param), getRule(param), getDqType(param), getDetails(param)) :: Nil
+ }
+
+ def getTempSourceNames(param: Map[String, Any]): Seq[String] = {
+ val dqType = getDqType(param)
+ param.get(_name) match {
+ case Some(name) => {
+ dqType match {
+ case AccuracyType => {
+ Seq[String](
+ resultName(param, AccuracyInfo._MissRecords),
+ resultName(param, AccuracyInfo._Accuracy)
+ )
+ }
+ case ProfilingType => {
+ Seq[String](
+ resultName(param, ProfilingInfo._Profiling)
+ )
+ }
+ case TimelinessType => {
+ Nil
+ }
+ case _ => Nil
+ }
+ }
+ case _ => Nil
+ }
+ }
+
+ def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = {
+ ruleStep match {
+ case rs @ GriffinDslStep(_, rule, dqType, _) => {
+ val exprOpt = try {
+ val result = parser.parseRule(rule, dqType)
+ if (result.successful) Some(result.get)
+ else {
+ println(result)
+ warn(s"adapt concrete rule step warn: parse rule [ ${rule} ] fails")
+ None
+ }
+ } catch {
+ case e: Throwable => {
+ error(s"adapt concrete rule step error: ${e.getMessage}")
+ None
+ }
+ }
+
+ exprOpt match {
+ case Some(expr) => {
+ try {
+ transConcreteRuleSteps(rs, expr)
+ } catch {
+ case e: Throwable => {
+ error(s"trans concrete rule step error: ${e.getMessage}")
+ Nil
+ }
+ }
+ }
+ case _ => Nil
+ }
+ }
+ case _ => Nil
+ }
+ }
+
+ private def transConcreteRuleSteps(ruleStep: GriffinDslStep, expr: Expr
+ ): Seq[ConcreteRuleStep] = {
+ val details = ruleStep.details
+ ruleStep.dqType match {
+ case AccuracyType => {
+ val sourceName = getNameOpt(details, AccuracyInfo._Source) match {
+ case Some(name) => name
+ case _ => dataSourceNames.head
+ }
+ val targetName = getNameOpt(details, AccuracyInfo._Target) match {
+ case Some(name) => name
+ case _ => dataSourceNames.tail.head
+ }
+ val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
+
+
+ if (!checkDataSourceExists(sourceName)) {
+ Nil
+ } else {
+ // 1. miss record
+ val missRecordsSql = if (!checkDataSourceExists(targetName)) {
+ val selClause = s"`${sourceName}`.*"
+ s"SELECT ${selClause} FROM `${sourceName}`"
+ } else {
+ val selClause = s"`${sourceName}`.*"
+ val onClause = expr.coalesceDesc
+ val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+ s"${sel.desc} IS NULL"
+ }.mkString(" AND ")
+ val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+ s"${sel.desc} IS NULL"
+ }.mkString(" AND ")
+ val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+ s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+ }
+ val missRecordsName = resultName(details, AccuracyInfo._MissRecords)
+ val missRecordsStep = SparkSqlStep(
+ missRecordsName,
+ missRecordsSql,
+ Map[String, Any](),
+ resultPersistType(details, AccuracyInfo._MissRecords, RecordPersistType),
+ resultUpdateDataSourceOpt(details, AccuracyInfo._MissRecords)
+ )
+
+ // 2. miss count
+ val missTableName = "_miss_"
+ val missColName = getNameOpt(details, AccuracyInfo._Miss).getOrElse(AccuracyInfo._Miss)
+ val missSql = {
+ s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsName}` GROUP BY `${GroupByColumn.tmst}`"
+ }
+ val missStep = SparkSqlStep(
+ missTableName,
+ missSql,
+ Map[String, Any](),
+ NonePersistType,
+ None
+ )
+
+ // 3. total count
+ val totalTableName = "_total_"
+ val totalColName = getNameOpt(details, AccuracyInfo._Total).getOrElse(AccuracyInfo._Total)
+ val totalSql = {
+ s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${GroupByColumn.tmst}`"
+ }
+ val totalStep = SparkSqlStep(
+ totalTableName,
+ totalSql,
+ Map[String, Any](),
+ NonePersistType,
+ None
+ )
+
+ // 4. accuracy metric
+ val matchedColName = getNameOpt(details, AccuracyInfo._Matched).getOrElse(AccuracyInfo._Matched)
+ val accuracyMetricSql = {
+ s"""
+ |SELECT `${totalTableName}`.`${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`,
+ |`${missTableName}`.`${missColName}` AS `${missColName}`,
+ |`${totalTableName}`.`${totalColName}` AS `${totalColName}`
+ |FROM `${totalTableName}` FULL JOIN `${missTableName}`
+ |ON `${totalTableName}`.`${GroupByColumn.tmst}` = `${missTableName}`.`${GroupByColumn.tmst}`
+ """.stripMargin
+ }
+ val accuracyMetricName = resultName(details, AccuracyInfo._Accuracy)
+ val accuracyMetricStep = SparkSqlStep(
+ accuracyMetricName,
+ accuracyMetricSql,
+ details,
+ // resultPersistType(details, AccuracyInfo._Accuracy, MetricPersistType)
+ NonePersistType,
+ None
+ )
+
+ // 5. accuracy metric filter
+ val accuracyStep = DfOprStep(
+ accuracyMetricName,
+ "accuracy",
+ Map[String, Any](
+ ("df.name" -> accuracyMetricName),
+ ("miss" -> missColName),
+ ("total" -> totalColName),
+ ("matched" -> matchedColName),
+ ("tmst" -> GroupByColumn.tmst)
+ ),
+ resultPersistType(details, AccuracyInfo._Accuracy, MetricPersistType),
+ None
+ )
+
+ missRecordsStep :: missStep :: totalStep :: accuracyMetricStep :: accuracyStep :: Nil
+ }
+ }
+ case ProfilingType => {
+ val sourceName = getNameOpt(details, ProfilingInfo._Source) match {
+ case Some(name) => name
+ case _ => dataSourceNames.head
+ }
+ val analyzer = ProfilingAnalyzer(expr.asInstanceOf[ProfilingClause], sourceName)
+
+ analyzer.selectionExprs.foreach(println)
+
+ val selExprDescs = analyzer.selectionExprs.map { sel =>
+ val alias = sel match {
+ case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`"
+ case _ => ""
+ }
+ s"${sel.desc}${alias}"
+ }
+
+// val selClause = (s"`${GroupByColumn.tmst}`" +: selExprDescs).mkString(", ")
+ val selClause = if (analyzer.containsAllSelectionExpr) {
+ selExprDescs.mkString(", ")
+ } else {
+ (s"`${GroupByColumn.tmst}`" +: selExprDescs).mkString(", ")
+ }
+
+// val tailClause = analyzer.tailsExprs.map(_.desc).mkString(" ")
+ val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${GroupByColumn.tmst}`") :: Nil, None)
+ val mergedGroubbyClause = tmstGroupbyClause.merge(analyzer.groupbyExprOpt match {
+ case Some(gbc) => gbc
+ case _ => GroupbyClause(Nil, None)
+ })
+ val groupbyClause = mergedGroubbyClause.desc
+ val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
+ val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ")
+
+ if (!checkDataSourceExists(sourceName)) {
+ Nil
+ } else {
+ // 1. select statement
+ val profilingSql = {
+// s"SELECT `${GroupByColumn.tmst}`, ${selClause} FROM ${sourceName} ${tailClause} GROUP BY `${GroupByColumn.tmst}`"
+ s"SELECT ${selClause} FROM ${sourceName} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+ }
+ val profilingMetricName = resultName(details, ProfilingInfo._Profiling)
+ val profilingStep = SparkSqlStep(
+ profilingMetricName,
+ profilingSql,
+ details,
+ resultPersistType(details, ProfilingInfo._Profiling, MetricPersistType),
+ None
+ )
+
+ // 2. clear processed data
+// val clearDataSourceStep = DfOprStep(
+// s"${sourceName}_clear",
+// "clear",
+// Map[String, Any](
+// ("df.name" -> sourceName)
+// ),
+// NonePersistType,
+// Some(sourceName)
+// )
+//
+// profilingStep :: clearDataSourceStep :: Nil
+
+ profilingStep:: Nil
+ }
+
+ }
+ case TimelinessType => {
+ Nil
+ }
+ case _ => Nil
+ }
+ }
+
+ private def checkDataSourceExists(name: String): Boolean = {
+ try {
+ RuleAdaptorGroup.dataChecker.existDataSourceName(name)
+ } catch {
+ case e: Throwable => {
+ error(s"check data source exists error: ${e.getMessage}")
+ false
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
new file mode 100644
index 0000000..744f52a
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
@@ -0,0 +1,72 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+import java.util.concurrent.atomic.AtomicLong
+
+
+import scala.collection.mutable.{Set => MutableSet}
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.rule.step.{ConcreteRuleStep, RuleStep}
+import org.apache.griffin.measure.rule.dsl.{DslType, PersistType}
+
+trait RuleAdaptor extends Loggable with Serializable {
+
+ val adaptPhase: AdaptPhase
+
+ val _name = "name"
+ val _rule = "rule"
+ val _persistType = "persist.type"
+ val _updateDataSource = "update.data.source"
+ val _details = "details"
+
+ protected def getName(param: Map[String, Any]) = param.getOrElse(_name, RuleStepNameGenerator.genName).toString
+ protected def getRule(param: Map[String, Any]) = param.getOrElse(_rule, "").toString
+ protected def getPersistType(param: Map[String, Any]) = PersistType(param.getOrElse(_persistType, "").toString)
+ protected def getUpdateDataSource(param: Map[String, Any]) = param.get(_updateDataSource).map(_.toString)
+ protected def getDetails(param: Map[String, Any]) = param.get(_details) match {
+ case Some(dt: Map[String, Any]) => dt
+ case _ => Map[String, Any]()
+ }
+
+ def getTempSourceNames(param: Map[String, Any]): Seq[String]
+
+ def genRuleStep(param: Map[String, Any]): Seq[RuleStep]
+ def genConcreteRuleStep(param: Map[String, Any]): Seq[ConcreteRuleStep] = {
+ genRuleStep(param).flatMap { rs =>
+ adaptConcreteRuleStep(rs)
+ }
+ }
+ protected def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep]
+
+}
+
+object RuleStepNameGenerator {
+ private val counter: AtomicLong = new AtomicLong(0L)
+ private val head: String = "rs"
+
+ def genName: String = {
+ s"${head}${increment}"
+ }
+
+ private def increment: Long = {
+ counter.incrementAndGet()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
new file mode 100644
index 0000000..237902a
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
@@ -0,0 +1,105 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.process.check.DataChecker
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.step._
+import org.apache.spark.sql.SQLContext
+
+import scala.collection.mutable.{Map => MutableMap}
+
+object RuleAdaptorGroup {
+
+ val _dslType = "dsl.type"
+
+ var dataSourceNames: Seq[String] = _
+ var functionNames: Seq[String] = _
+
+ var dataChecker: DataChecker = _
+
+ def init(sqlContext: SQLContext, dsNames: Seq[String]): Unit = {
+ val functions = sqlContext.sql("show functions")
+ functionNames = functions.map(_.getString(0)).collect
+ dataSourceNames = dsNames
+
+ dataChecker = DataChecker(sqlContext)
+ }
+
+ private def getDslType(param: Map[String, Any], defDslType: DslType) = {
+ val dt = DslType(param.getOrElse(_dslType, "").toString)
+ dt match {
+ case UnknownDslType => defDslType
+ case _ => dt
+ }
+ }
+
+ private def genRuleAdaptor(dslType: DslType, dsNames: Seq[String], adaptPhase: AdaptPhase): Option[RuleAdaptor] = {
+ dslType match {
+ case SparkSqlType => Some(SparkSqlAdaptor(adaptPhase))
+ case DfOprType => Some(DataFrameOprAdaptor(adaptPhase))
+ case GriffinDslType => Some(GriffinDslAdaptor(dsNames, functionNames, adaptPhase))
+ case _ => None
+ }
+ }
+
+// def genRuleSteps(evaluateRuleParam: EvaluateRuleParam): Seq[RuleStep] = {
+// val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else evaluateRuleParam.dslType
+// val defaultDslType = DslType(dslTypeStr)
+// val rules = evaluateRuleParam.rules
+// var dsNames = dataSourceNames
+// val steps = rules.flatMap { param =>
+// val dslType = getDslType(param)
+// genRuleAdaptor(dslType) match {
+// case Some(ruleAdaptor) => ruleAdaptor.genRuleStep(param)
+// case _ => Nil
+// }
+// }
+// steps.foreach(println)
+// steps
+// }
+
+ def genConcreteRuleSteps(evaluateRuleParam: EvaluateRuleParam,
+ adaptPhase: AdaptPhase
+ ): Seq[ConcreteRuleStep] = {
+ val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else evaluateRuleParam.dslType
+ val defaultDslType = DslType(dslTypeStr)
+ val ruleParams = evaluateRuleParam.rules
+ genConcreteRuleSteps(ruleParams, defaultDslType, adaptPhase)
+ }
+
+ def genConcreteRuleSteps(ruleParams: Seq[Map[String, Any]],
+ defDslType: DslType, adaptPhase: AdaptPhase
+ ): Seq[ConcreteRuleStep] = {
+ val (steps, dsNames) = ruleParams.foldLeft((Seq[ConcreteRuleStep](), dataSourceNames)) { (res, param) =>
+ val (preSteps, preNames) = res
+ val dslType = getDslType(param, defDslType)
+ val (curSteps, curNames) = genRuleAdaptor(dslType, preNames, adaptPhase) match {
+ case Some(ruleAdaptor) => (ruleAdaptor.genConcreteRuleStep(param), preNames ++ ruleAdaptor.getTempSourceNames(param))
+ case _ => (Nil, preNames)
+ }
+ (preSteps ++ curSteps, curNames)
+ }
+ steps
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
new file mode 100644
index 0000000..78121fa
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+import org.apache.griffin.measure.data.connector.GroupByColumn
+import org.apache.griffin.measure.rule.step._
+
+case class SparkSqlAdaptor(adaptPhase: AdaptPhase) extends RuleAdaptor {
+
+ def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = {
+ SparkSqlStep(getName(param), getRule(param), getDetails(param),
+ getPersistType(param), getUpdateDataSource(param)) :: Nil
+ }
+ def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = {
+ ruleStep match {
+ case rs @ SparkSqlStep(name, rule, details, persistType, udsOpt) => {
+ adaptPhase match {
+ case PreProcPhase => rs :: Nil
+ case RunPhase => {
+ val repSel = rule.replaceFirst("(?i)select", s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`,")
+ val groupbyRule = repSel.concat(s" GROUP BY `${GroupByColumn.tmst}`")
+ val nrs = SparkSqlStep(name, groupbyRule, details, persistType, udsOpt)
+ nrs :: Nil
+ }
+ }
+ }
+ case _ => Nil
+ }
+ }
+
+ def getTempSourceNames(param: Map[String, Any]): Seq[String] = {
+ param.get(_name) match {
+ case Some(name) => name.toString :: Nil
+ case _ => Nil
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
new file mode 100644
index 0000000..ac27403
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl
+
+import scala.util.matching.Regex
+
+
+sealed trait DqType {
+ val regex: Regex
+ val desc: String
+}
+
+object DqType {
+ private val dqTypes: List[DqType] = List(AccuracyType, ProfilingType, TimelinessType, UnknownType)
+ def apply(ptn: String): DqType = {
+ dqTypes.filter(tp => ptn match {
+ case tp.regex() => true
+ case _ => false
+ }).headOption.getOrElse(UnknownType)
+ }
+ def unapply(pt: DqType): Option[String] = Some(pt.desc)
+}
+
+final case object AccuracyType extends DqType {
+ val regex = "^(?i)accuracy$".r
+ val desc = "accuracy"
+}
+
+final case object ProfilingType extends DqType {
+ val regex = "^(?i)profiling$".r
+ val desc = "profiling$"
+}
+
+final case object TimelinessType extends DqType {
+ val regex = "^(?i)timeliness$".r
+ val desc = "timeliness"
+}
+
+final case object UnknownType extends DqType {
+ val regex = "".r
+ val desc = "unknown"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala
new file mode 100644
index 0000000..cfda393
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl
+
+import scala.util.matching.Regex
+
+
+sealed trait DslType {
+ val regex: Regex
+ val desc: String
+}
+
+object DslType {
+ private val dslTypes: List[DslType] = List(SparkSqlType, GriffinDslType, DfOprType, UnknownDslType)
+ def apply(ptn: String): DslType = {
+ dslTypes.filter(tp => ptn match {
+ case tp.regex() => true
+ case _ => false
+ }).headOption.getOrElse(UnknownDslType)
+ }
+ def unapply(pt: DslType): Option[String] = Some(pt.desc)
+}
+
+final case object SparkSqlType extends DslType {
+ val regex = "^(?i)spark-?sql$".r
+ val desc = "spark-sql"
+}
+
+final case object DfOprType extends DslType {
+ val regex = "^(?i)df-?opr$".r
+ val desc = "df-opr"
+}
+
+final case object GriffinDslType extends DslType {
+ val regex = "^(?i)griffin-?dsl$".r
+ val desc = "griffin-dsl"
+}
+
+final case object UnknownDslType extends DslType {
+ val regex = "".r
+ val desc = "unknown"
+}
\ No newline at end of file